diff --git a/Cargo.lock b/Cargo.lock index 5142f07..c20e6b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2481,7 +2481,7 @@ checksum = "a5f43f184355eefb8d17fc948dbecf6c13be3c141f20d834ae842193a448c72a" [[package]] name = "libp2p" version = "0.54.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "bytes 1.7.1", "either", @@ -2517,7 +2517,7 @@ dependencies = [ [[package]] name = "libp2p-allow-block-list" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "libp2p-core", "libp2p-identity", @@ -2528,7 +2528,7 @@ dependencies = [ [[package]] name = "libp2p-autonat" version = "0.13.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "async-trait", "asynchronous-codec", @@ -2554,7 +2554,7 @@ dependencies = [ [[package]] name = "libp2p-connection-limits" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "libp2p-core", "libp2p-identity", @@ -2565,7 +2565,7 @@ dependencies = [ [[package]] name = "libp2p-core" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "either", "fnv", @@ -2592,7 +2592,7 @@ dependencies = [ [[package]] name = "libp2p-dcutr" version = "0.12.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "asynchronous-codec", "either", @@ -2614,7 +2614,7 @@ dependencies = [ [[package]] name = "libp2p-dns" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "async-trait", "futures", @@ -2629,7 +2629,7 @@ dependencies = [ [[package]] name = "libp2p-gossipsub" version = "0.47.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "asynchronous-codec", "base64 0.22.1", @@ -2660,7 +2660,7 @@ dependencies = [ [[package]] name = "libp2p-identify" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "asynchronous-codec", "either", @@ -2702,7 +2702,7 @@ dependencies = [ [[package]] name = "libp2p-kad" version = "0.46.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "arrayvec", "asynchronous-codec", @@ -2730,7 +2730,7 @@ dependencies = [ [[package]] name = "libp2p-mdns" version = "0.46.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "data-encoding", "futures", @@ -2750,7 +2750,7 @@ dependencies = [ [[package]] name = "libp2p-metrics" version = "0.14.2" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "futures", "libp2p-core", @@ -2770,7 +2770,7 @@ dependencies = [ [[package]] name = "libp2p-noise" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -2795,7 +2795,7 @@ dependencies = [ [[package]] name = "libp2p-ping" version = "0.45.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "either", "futures", @@ -2812,7 +2812,7 @@ dependencies = [ [[package]] name = "libp2p-quic" version = "0.11.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "bytes 1.7.1", "futures", @@ -2835,7 +2835,7 @@ dependencies = [ [[package]] name = "libp2p-relay" version = "0.18.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -2859,7 +2859,7 @@ dependencies = [ [[package]] name = "libp2p-request-response" version = "0.27.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "async-trait", "futures", @@ -2878,7 +2878,7 @@ dependencies = [ [[package]] name = "libp2p-swarm" version = "0.45.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "either", "fnv", @@ -2901,7 +2901,7 @@ dependencies = [ [[package]] name = "libp2p-swarm-derive" version = "0.35.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "heck 0.5.0", "proc-macro2", @@ -2912,7 +2912,7 @@ dependencies = [ [[package]] name = "libp2p-tcp" version = "0.42.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "futures", "futures-timer", @@ -2928,7 +2928,7 @@ dependencies = [ [[package]] name = "libp2p-tls" version = "0.5.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "futures", "futures-rustls", @@ -2946,7 +2946,7 @@ dependencies = [ [[package]] name = "libp2p-upnp" version = "0.3.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "futures", "futures-timer", @@ -2961,7 +2961,7 @@ dependencies = [ [[package]] name = "libp2p-yamux" version = "0.46.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "either", "futures", @@ -3338,7 +3338,7 @@ dependencies = [ [[package]] name = "multistream-select" version = "0.13.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "bytes 1.7.1", "futures", @@ -4089,7 +4089,7 @@ dependencies = [ [[package]] name = "quick-protobuf-codec" version = "0.3.1" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "asynchronous-codec", "bytes 1.7.1", @@ -4675,7 +4675,7 @@ checksum = "955d28af4278de8121b7ebeb796b6a45735dc01436d898801014aced2773a3d6" [[package]] name = "rw-stream-sink" version = "0.4.0" -source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=84b6d6f#84b6d6f34ad18272a74ebe70945ba35cf1df18d4" +source = "git+https://github.com/anilaltuner/rust-libp2p.git?rev=c5cefa1#c5cefa16b1ba2cf09448b6f2b3d11b39a393b651" dependencies = [ "futures", "pin-project", diff --git a/Cargo.toml b/Cargo.toml index 4d3f5a5..f8dc422 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" license = "Apache-2.0" readme = "README.md" +# profiling build for flamegraphs [profile.profiling] inherits = "release" debug = true @@ -43,7 +44,7 @@ fastbloom-rs = "0.5.9" ollama-workflows = { git = "https://github.com/andthattoo/ollama-workflows", rev = "25467d2" } # peer-to-peer -libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "84b6d6f", features = [ +libp2p = { git = "https://github.com/anilaltuner/rust-libp2p.git", rev = "c5cefa1", features = [ "dcutr", "ping", "relay", @@ -65,9 +66,6 @@ tracing = { version = "0.1.40" } tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } public-ip = "0.2.2" -# TODO: solves ecies dependency issue -# getrandom = "0.2.15" - [dev-dependencies] colored = "2.1.0" diff --git a/src/config/mod.rs b/src/config/mod.rs index a0328a0..31e4c5c 100644 --- a/src/config/mod.rs +++ b/src/config/mod.rs @@ -1,11 +1,13 @@ mod models; mod ollama; +mod openai; use crate::utils::crypto::to_address; use libsecp256k1::{PublicKey, SecretKey}; use models::ModelConfig; use ollama::OllamaConfig; use ollama_workflows::ModelProvider; +use openai::OpenAIConfig; use std::env; @@ -26,6 +28,8 @@ pub struct DriaComputeNodeConfig { /// Even if Ollama is not used, we store the host & port here. /// If Ollama is used, this config will be respected during its instantiations. pub ollama_config: OllamaConfig, + /// OpenAI API key & its service check implementation. + pub openai_config: OpenAIConfig, } /// The default P2P network listen address. @@ -88,8 +92,6 @@ impl DriaComputeNodeConfig { let p2p_listen_addr = env::var("DKN_P2P_LISTEN_ADDR").unwrap_or(DEFAULT_P2P_LISTEN_ADDR.to_string()); - let ollama_config = OllamaConfig::new(); - Self { admin_public_key, secret_key, @@ -97,7 +99,8 @@ impl DriaComputeNodeConfig { address, model_config, p2p_listen_addr, - ollama_config, + ollama_config: OllamaConfig::new(), + openai_config: OpenAIConfig::new(), } } @@ -120,12 +123,12 @@ impl DriaComputeNodeConfig { // if OpenAI is a provider, check that the API key is set if unique_providers.contains(&ModelProvider::OpenAI) { - log::info!("Checking OpenAI requirements"); - const OPENAI_API_KEY: &str = "OPENAI_API_KEY"; - - if std::env::var(OPENAI_API_KEY).is_err() { - return Err("OpenAI API key not found".into()); - } + let openai_models = self + .model_config + .get_models_for_provider(ModelProvider::OpenAI); + self.openai_config + .check(openai_models.into_iter().map(|m| m.to_string()).collect()) + .await?; } Ok(()) diff --git a/src/config/ollama.rs b/src/config/ollama.rs index 2e9ce39..cff10ad 100644 --- a/src/config/ollama.rs +++ b/src/config/ollama.rs @@ -51,7 +51,7 @@ impl OllamaConfig { let auto_pull = std::env::var("OLLAMA_AUTO_PULL").unwrap_or_default() == "true"; - OllamaConfig { + Self { host, port, hardcoded_models, @@ -109,6 +109,7 @@ impl OllamaConfig { } } + log::info!("Ollama setup is all good.",); Ok(()) } } diff --git a/src/config/openai.rs b/src/config/openai.rs new file mode 100644 index 0000000..82e6c61 --- /dev/null +++ b/src/config/openai.rs @@ -0,0 +1,98 @@ +#![allow(unused)] + +use serde::Deserialize; + +const OPENAI_API_KEY: &str = "OPENAI_API_KEY"; + +const OPENAI_MODELS_API: &str = "https://api.openai.com/v1/models"; + +/// [Model](https://platform.openai.com/docs/api-reference/models/object) API object. +#[derive(Debug, Clone, Deserialize)] +struct OpenAIModel { + /// The model identifier, which can be referenced in the API endpoints. + id: String, + /// The Unix timestamp (in seconds) when the model was created. + created: u64, + /// The object type, which is always "model". + object: String, + /// The organization that owns the model. + owned_by: String, +} + +#[derive(Debug, Clone, Deserialize)] +struct OpenAIModelsResponse { + data: Vec, + object: String, +} + +#[derive(Debug, Clone)] +pub struct OpenAIConfig { + pub(crate) api_key: Option, +} + +#[derive(Debug, Clone, Deserialize)] +struct OpenAIError { + message: String, +} + +impl OpenAIConfig { + /// Looks at the environment variables for OpenAI API key. + pub fn new() -> Self { + let api_key = std::env::var(OPENAI_API_KEY).ok(); + + Self { api_key } + } + + /// Check if requested models exist. + pub async fn check(&self, models: Vec) -> Result<(), String> { + log::info!("Checking OpenAI requirements"); + + let Some(api_key) = &self.api_key else { + return Err("OpenAI API key not found".into()); + }; + + let client = reqwest::Client::new(); + let request = client + .get(OPENAI_MODELS_API) + .header("Authorization", format!("Bearer {}", api_key)) + .build() + .map_err(|e| format!("Failed to build request: {}", e))?; + + let response = client + .execute(request) + .await + .map_err(|e| format!("Failed to send request: {}", e))?; + + if response.status().is_client_error() { + return Err(format!( + "Failed to fetch OpenAI models:\n{}", + response.text().await.unwrap_or_default() + )); + } + + let openai_models = response.json::().await.unwrap(); + for requested_model in models { + if !openai_models.data.iter().any(|m| m.id == requested_model) { + return Err(format!( + "Model {} not found in your OpenAI account.", + requested_model + )); + } + } + + log::info!("OpenAI setup is all good.",); + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_openai_check() { + let config = OpenAIConfig::new(); + let res = config.check(vec![]).await; + println!("Result: {}", res.unwrap_err()); + } +} diff --git a/src/main.rs b/src/main.rs index b473a8f..641643a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,7 +5,7 @@ use dkn_compute::{DriaComputeNode, DriaComputeNodeConfig}; #[tokio::main] async fn main() -> Result<(), Box> { if let Err(e) = dotenvy::dotenv() { - log::warn!("Could not not load .env file: {}", e); + log::warn!("Could not load .env file: {}", e); } env_logger::builder() @@ -18,11 +18,17 @@ async fn main() -> Result<(), Box> { // create configurations & check required services let config = DriaComputeNodeConfig::new(); - config.check_services().await?; + if let Err(err) = config.check_services().await { + log::error!("Error checking services: {}", err); + panic!("Service check failed.") + } // launch the node let mut node = DriaComputeNode::new(config, CancellationToken::new()).await?; - node.launch().await?; + if let Err(err) = node.launch().await { + log::error!("Node error: {}", err); + panic!("Node failed.") + }; Ok(()) } diff --git a/src/p2p/behaviour.rs b/src/p2p/behaviour.rs index 6761bd4..2f35761 100644 --- a/src/p2p/behaviour.rs +++ b/src/p2p/behaviour.rs @@ -36,12 +36,17 @@ impl DriaBehaviour { /// Configures the Kademlia DHT behavior for the node. #[inline] fn create_kademlia_behavior(local_peer_id: PeerId) -> kad::Behaviour { - use kad::{Behaviour, Config}; + use kad::{Behaviour, Caching, Config}; - const QUERY_TIMEOUT_SECS: u64 = 5 * 60; // 5 minutes + const QUERY_TIMEOUT_SECS: u64 = 5 * 60; + const RECORD_TTL_SECS: u64 = 30; let mut cfg = Config::new(DRIA_PROTO_NAME); - cfg.set_query_timeout(Duration::from_secs(QUERY_TIMEOUT_SECS)); + cfg.set_query_timeout(Duration::from_secs(QUERY_TIMEOUT_SECS)) + .set_record_ttl(Some(Duration::from_secs(RECORD_TTL_SECS))) + .set_replication_interval(None) + .set_caching(Caching::Disabled) + .set_publication_interval(None); Behaviour::with_config(local_peer_id, MemoryStore::new(local_peer_id), cfg) } @@ -83,7 +88,10 @@ fn create_gossipsub_behavior(id_keys: Keypair) -> gossipsub::Behaviour { use gossipsub::{Behaviour, ConfigBuilder, Message, MessageAuthenticity, MessageId}; /// Message TTL in seconds - const MESSAGE_TTL: u64 = 100; + const MESSAGE_TTL_SECS: u64 = 100; + + /// Gossip cache TTL in seconds + const GOSSIP_TTL_SECS: u64 = 100; /// Message capacity for the gossipsub cache const MESSAGE_CAPACITY: usize = 100; @@ -91,6 +99,10 @@ fn create_gossipsub_behavior(id_keys: Keypair) -> gossipsub::Behaviour { /// Max transmit size for payloads 256 KB const MAX_TRANSMIT_SIZE: usize = 262144; + /// Max IHAVE length, this is much lower than the default + /// because we don't need historic messages at all + const MAX_IHAVE_LENGTH: usize = 100; + // message id's are simply hashes of the message data let message_id_fn = |message: &Message| { let mut hasher = hash_map::DefaultHasher::new(); @@ -104,9 +116,10 @@ fn create_gossipsub_behavior(id_keys: Keypair) -> gossipsub::Behaviour { .heartbeat_interval(Duration::from_secs(10)) .max_transmit_size(MAX_TRANSMIT_SIZE) // 256 KB .message_id_fn(message_id_fn) - .message_ttl(Duration::from_secs(MESSAGE_TTL)) + .message_ttl(Duration::from_secs(MESSAGE_TTL_SECS)) + .gossip_ttl(Duration::from_secs(GOSSIP_TTL_SECS)) .message_capacity(MESSAGE_CAPACITY) - .max_ihave_length(100) + .max_ihave_length(MAX_IHAVE_LENGTH) .build() .expect("Valid config"), // TODO: better error handling ) diff --git a/src/p2p/client.rs b/src/p2p/client.rs index 5c8767d..d6e9a1b 100644 --- a/src/p2p/client.rs +++ b/src/p2p/client.rs @@ -17,7 +17,8 @@ use super::{DriaBehaviour, DriaBehaviourEvent, DRIA_PROTO_NAME}; pub struct P2PClient { swarm: Swarm, cancellation: CancellationToken, - peer_count: usize, + /// Peer count for (All, Mesh). + peer_count: (usize, usize), peer_last_refreshed: tokio::time::Instant, } @@ -115,7 +116,7 @@ impl P2PClient { Ok(Self { swarm, cancellation, - peer_count: 0, + peer_count: (0, 0), peer_last_refreshed: tokio::time::Instant::now(), }) } @@ -194,37 +195,8 @@ impl P2PClient { /// When a message is received, it will be returned. pub async fn process_events(&mut self) -> Option<(PeerId, MessageId, Message)> { loop { - // do a random walk if it has been sometime since we last refreshed it - if self.peer_last_refreshed.elapsed() > Duration::from_secs(PEER_REFRESH_INTERVAL_SECS) - { - let random_peer = PeerId::random(); - self.swarm - .behaviour_mut() - .kademlia - .get_closest_peers(random_peer); - self.peer_last_refreshed = tokio::time::Instant::now(); - - // print number of peers - let latest_peers = self - .swarm - .behaviour() - .gossipsub - .all_peers() - .collect::>(); - - // print peers if the count has changed - if latest_peers.len() != self.peer_count { - self.peer_count = latest_peers.len(); - log::info!("Peer Count: {}", latest_peers.len()); - log::debug!( - "Peers: {:#?}", - latest_peers - .into_iter() - .map(|(p, _)| p.to_string()) - .collect::>() - ); - } - } + // refresh peers + self.refresh_peer_counts().await; // wait for next event tokio::select! { @@ -323,4 +295,54 @@ impl P2PClient { } } } + + /// Does a random-walk over DHT and updates peer counts as needed. + /// Keeps track of the last time the peer count was refreshed. + /// + /// Should be called in a loop. + /// + /// Returns: (All Peer Count, Mesh Peer Count) + async fn refresh_peer_counts(&mut self) { + if self.peer_last_refreshed.elapsed() > Duration::from_secs(PEER_REFRESH_INTERVAL_SECS) { + let random_peer = PeerId::random(); + self.swarm + .behaviour_mut() + .kademlia + .get_closest_peers(random_peer); + self.peer_last_refreshed = tokio::time::Instant::now(); + + // print number of peers + let gossipsub = &self.swarm.behaviour().gossipsub; + + // print peers if the count has changed + let num_peers = gossipsub.all_peers().count(); + let num_mesh_peers = gossipsub.all_mesh_peers().count(); + if num_peers != self.peer_count.0 || num_mesh_peers != self.peer_count.1 { + self.peer_count = (num_peers, num_mesh_peers); + log::info!( + "Peer Count (mesh / all): {} / {}", + num_mesh_peers, + num_peers + ); + log::debug!( + "All Peers:\n{}", + gossipsub + .all_peers() + .enumerate() + .map(|(i, (p, _))| format!("{:#3}: {}", i, p.to_string())) + .collect::>() + .join("\n") + ); + log::debug!( + "Mesh Peers:\n{}", + gossipsub + .all_mesh_peers() + .enumerate() + .map(|(i, p)| format!("{:#3}: {}", i, p.to_string())) + .collect::>() + .join("\n") + ); + } + } + } }