Skip to content

Commit

Permalink
p2p: Major refactoring of the handshake protocol
Browse files Browse the repository at this point in the history
Introduce a new protocol InitProtocol which can be used as the core protocol
for initializing a connection with a peer.
Move the handshake logic from the PeerPool module to the protocols directory and
build a handshake protocol that implements InitProtocol trait.
  • Loading branch information
hozan23 committed Jul 15, 2024
1 parent 6c65232 commit e15d3e6
Show file tree
Hide file tree
Showing 18 changed files with 637 additions and 563 deletions.
3 changes: 3 additions & 0 deletions core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@ pub enum Error {
#[error("Path Not Found Error: {0}")]
PathNotFound(&'static str),

#[error("Event Emit Error: {0}")]
EventEmitError(String),

#[cfg(feature = "crypto")]
#[error(transparent)]
Ed25519(#[from] ed25519_dalek::ed25519::Error),
Expand Down
58 changes: 44 additions & 14 deletions core/src/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use chrono::{DateTime, Utc};
use futures_util::stream::{FuturesUnordered, StreamExt};
use log::{debug, error};

use crate::{async_runtime::lock::Mutex, util::random_32, Result};
use crate::{async_runtime::lock::Mutex, util::random_32, Error, Result};

const CHANNEL_BUFFER_SIZE: usize = 1000;

Expand Down Expand Up @@ -111,13 +111,17 @@ where
///
/// The event must implement the [`EventValueTopic`] trait to indicate the
/// topic of the event. Otherwise, you can use `emit_by_topic()`.
pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) {
pub async fn emit<E: EventValueTopic<Topic = T> + Clone>(&self, value: &E) -> Result<()> {
let topic = E::topic();
self.emit_by_topic(&topic, value).await;
self.emit_by_topic(&topic, value).await
}

/// Emits an event to the listeners.
pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(&self, topic: &T, value: &E) {
pub async fn emit_by_topic<E: EventValueAny + EventValue + Clone>(
&self,
topic: &T,
value: &E,
) -> Result<()> {
let value: Arc<dyn EventValueAny> = Arc::new(value.clone());
let event = Event::new(value);

Expand All @@ -128,15 +132,21 @@ where
"Failed to emit an event to a non-existent topic {:?}",
topic
);
return;
return Err(Error::EventEmitError(format!(
"Emit an event to a non-existent topic {:?}",
topic,
)));
}

let event_ids = topics.get_mut(topic).unwrap();
let event_id = E::id().to_string();

if !event_ids.contains_key(&event_id) {
debug!("Failed to emit an event: unknown event id {:?}", event_id);
return;
return Err(Error::EventEmitError(format!(
"Emit an event: unknown event id {}",
event_id,
)));
}

let mut results = FuturesUnordered::new();
Expand All @@ -159,17 +169,19 @@ where
for listener_id in failed_listeners.iter() {
listeners.remove(listener_id);
}

Ok(())
}

/// Registers a new event listener for the given topic.
pub async fn register<E: EventValueAny + EventValue + Clone>(
self: &Arc<Self>,
topic: &T,
) -> EventListener<T, E> {
let chan = async_channel::bounded(self.listener_buffer_size);

let topics = &mut self.listeners.lock().await;

let chan = async_channel::bounded(self.listener_buffer_size);

if !topics.contains_key(topic) {
topics.insert(topic.clone(), HashMap::new());
}
Expand Down Expand Up @@ -197,6 +209,16 @@ where
listener
}

/// Remove all topics and event listeners
pub async fn clear(self: &Arc<Self>) {
self.listeners.lock().await.clear();
}

/// Unregisters all event listeners for the given topic.
pub async fn unregister_topic(self: &Arc<Self>, topic: &T) {
self.listeners.lock().await.remove(topic);
}

/// Removes the event listener attached to the given topic.
async fn remove(&self, topic: &T, event_id: &str, listener_id: &EventListenerID) {
let topics = &mut self.listeners.lock().await;
Expand Down Expand Up @@ -419,10 +441,12 @@ mod tests {

event_sys
.emit_by_topic(&Topic::TopicA, &A { a_value: 3 })
.await;
.await
.expect("Emit event");
event_sys
.emit_by_topic(&Topic::TopicB, &B { b_value: 5 })
.await;
.await
.expect("Emit event");

let msg = a_listener.recv().await.unwrap();
assert_eq!(msg, A { a_value: 3 });
Expand All @@ -434,13 +458,17 @@ mod tests {
let c_listener = event_sys.register::<C>(&Topic::TopicC).await;
let d_listener = event_sys.register::<C>(&Topic::TopicD).await;

event_sys.emit(&C { c_value: 10 }).await;
event_sys
.emit(&C { c_value: 10 })
.await
.expect("Emit event");
let msg = c_listener.recv().await.unwrap();
assert_eq!(msg, C { c_value: 10 });

event_sys
.emit_by_topic(&Topic::TopicD, &C { c_value: 10 })
.await;
.await
.expect("Emit event");
let msg = d_listener.recv().await.unwrap();
assert_eq!(msg, C { c_value: 10 });

Expand All @@ -450,14 +478,16 @@ mod tests {

event_sys
.emit_by_topic(&Topic::TopicE, &E { e_value: 5 })
.await;
.await
.expect("Emit event");

let msg = e_listener.recv().await.unwrap();
assert_eq!(msg, E { e_value: 5 });

event_sys
.emit_by_topic(&Topic::TopicE, &F { f_value: 5 })
.await;
.await
.expect("Emit event");

let msg = f_listener.recv().await.unwrap();
assert_eq!(msg, F { f_value: 5 });
Expand Down
45 changes: 18 additions & 27 deletions p2p/README.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
# karyon p2p
# Karyon p2p

karyon p2p serves as the foundational stack for the Karyon library. It offers
Karyon p2p serves as the foundational stack for the Karyon library. It offers
a lightweight, extensible, and customizable peer-to-peer (p2p) network stack
that seamlessly integrates with any p2p project.

## Architecture

### Discovery

karyon p2p uses a customized version of the Kademlia for discovering new peers
Karyon p2p uses a customized version of the Kademlia for discovering new peers
in the network. This approach is based on Kademlia but with several significant
differences and optimizations. Some of the main changes:

1. karyon p2p uses TCP for the lookup process, while UDP is used for
1. Karyon p2p uses TCP for the lookup process, while UDP is used for
validating and refreshing the routing table. The reason for this choice is
that the lookup process is infrequent, and the work required to manage
messages with UDP is largely equivalent to using TCP for this purpose.
Expand All @@ -21,11 +21,11 @@ differences and optimizations. Some of the main changes:
use UDP.

2. In contrast to traditional Kademlia, which often employs 160 buckets,
karyon p2p reduces the number of buckets to 32. This optimization is a
Karyon p2p reduces the number of buckets to 32. This optimization is a
result of the observation that most nodes tend to map into the last few
buckets, with the majority of other buckets remaining empty.

3. While Kademlia typically uses a 160-bit key to identify a peer, karyon p2p
3. While Kademlia typically uses a 160-bit key to identify a peer, Karyon p2p
uses a 256-bit key.

> Despite criticisms of Kademlia's vulnerabilities, particularly concerning
Expand All @@ -38,7 +38,7 @@ differences and optimizations. Some of the main changes:
### Peer ID

In the karyon p2p network, each peer is identified by a 256-bit (32-byte) Peer ID.
In the Karyon p2p network, each peer is identified by a 256-bit (32-byte) Peer ID.

### Seeding

Expand Down Expand Up @@ -67,21 +67,20 @@ is added to the `PeerPool`.

### Protocols

In the karyon p2p network, we have two types of protocols: core protocols and
custom protocols. Core protocols are prebuilt into karyon p2p, such as the
Ping protocol used to maintain connections. Custom protocols, on the other
hand, are protocols that you define for your application to provide its core
functionality.
In the Karyon p2p network, there are two types of protocols: core protocols and
custom protocols. Core protocols, such as the Ping and Handshake protocols,
come prebuilt into Karyon p2p. Custom protocols, however, are ones that you
create to provide the specific functionality your application needs.

Here's an example of a custom protocol:

```rust
pub struct NewProtocol {
peer: ArcPeer,
peer: Arc<Peer>,
}

impl NewProtocol {
fn new(peer: Arc<Peer>) -> Arc<Protocol> {
fn new(peer: Arc<Peer>) -> Arc<dyn Protocol> {
Arc::new(Self {
peer,
})
Expand All @@ -90,12 +89,9 @@ impl NewProtocol {

#[async_trait]
impl Protocol for NewProtocol {
async fn start(self: Arc<Self>) -> Result<(), P2pError> {
let listener = self.peer.register_listener::<Self>().await;
async fn start(self: Arc<Self>) -> Result<(), Error> {
loop {
let event = listener.recv().await.unwrap();

match event {
match self.peer.recv::<Self>().await.expect("Receive msg") {
ProtocolEvent::Message(msg) => {
println!("{:?}", msg);
}
Expand All @@ -104,12 +100,10 @@ impl Protocol for NewProtocol {
}
}
}

listener.cancel().await;
Ok(())
}

fn version() -> Result<Version, P2pError> {
fn version() -> Result<Version, Error> {
"0.2.0, >0.1.0".parse()
}

Expand All @@ -120,20 +114,17 @@ impl Protocol for NewProtocol {

```

Whenever a new peer is added to the `PeerPool`, all the protocols, including
the custom protocols, will automatically start running with the newly connected peer.

## Network Security

Using TLS is possible for all inbound and outbound connections by enabling the
boolean `enable_tls` field in the configuration. However, implementing TLS for
a P2P network is not trivial and is still unstable, requiring a comprehensive
a p2p network is not trivial and is still unstable, requiring a comprehensive
audit.


## Choosing the async runtime

karyon p2p currently supports both **smol(async-std)** and **tokio** async runtimes.
Karyon p2p currently supports both **smol(async-std)** and **tokio** async runtimes.
The default is **smol**, but if you want to use **tokio**, you need to disable
the default features and then select the `tokio` feature.

Expand Down
6 changes: 1 addition & 5 deletions p2p/examples/chat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,8 @@ impl Protocol for ChatProtocol {
}
});

let listener = self.peer.register_listener::<Self>().await;
loop {
let event = listener.recv().await.expect("Receive new protocol event");

match event {
match self.peer.recv::<Self>().await? {
ProtocolEvent::Message(msg) => {
let msg = String::from_utf8(msg).expect("Convert received bytes to string");
println!("{msg}");
Expand All @@ -85,7 +82,6 @@ impl Protocol for ChatProtocol {
}

task.cancel().await;
listener.cancel().await;
Ok(())
}

Expand Down
53 changes: 16 additions & 37 deletions p2p/src/conn_queue.rs
Original file line number Diff line number Diff line change
@@ -1,37 +1,13 @@
use std::{collections::VecDeque, fmt, sync::Arc};

use async_channel::Sender;
use std::{collections::VecDeque, sync::Arc};

use karyon_core::{async_runtime::lock::Mutex, async_util::CondVar};
use karyon_net::Conn;

use crate::{message::NetMsg, Result};

/// Defines the direction of a network connection.
#[derive(Clone, Debug)]
pub enum ConnDirection {
Inbound,
Outbound,
}

impl fmt::Display for ConnDirection {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
ConnDirection::Inbound => write!(f, "Inbound"),
ConnDirection::Outbound => write!(f, "Outbound"),
}
}
}

pub struct NewConn {
pub direction: ConnDirection,
pub conn: Conn<NetMsg>,
pub disconnect_signal: Sender<Result<()>>,
}
use crate::{connection::ConnDirection, connection::Connection, message::NetMsg, Result};

/// Connection queue
pub struct ConnQueue {
queue: Mutex<VecDeque<NewConn>>,
queue: Mutex<VecDeque<Connection>>,
conn_available: CondVar,
}

Expand All @@ -43,24 +19,27 @@ impl ConnQueue {
})
}

/// Push a connection into the queue and wait for the disconnect signal
/// Handle a connection by pushing it into the queue and wait for the disconnect signal
pub async fn handle(&self, conn: Conn<NetMsg>, direction: ConnDirection) -> Result<()> {
let (disconnect_signal, chan) = async_channel::bounded(1);
let new_conn = NewConn {
direction,
conn,
disconnect_signal,
};
let endpoint = conn.peer_endpoint()?;

let (disconnect_tx, disconnect_rx) = async_channel::bounded(1);
let new_conn = Connection::new(conn, disconnect_tx, direction, endpoint);

// Push a new conn to the queue
self.queue.lock().await.push_back(new_conn);
self.conn_available.signal();
if let Ok(result) = chan.recv().await {

// Wait for the disconnect signal from the connection handler
if let Ok(result) = disconnect_rx.recv().await {
return result;
}

Ok(())
}

/// Receive the next connection in the queue
pub async fn next(&self) -> NewConn {
/// Waits for the next connection in the queue
pub async fn next(&self) -> Connection {
let mut queue = self.queue.lock().await;
while queue.is_empty() {
queue = self.conn_available.wait(queue).await;
Expand Down
Loading

0 comments on commit e15d3e6

Please sign in to comment.