diff --git a/Cargo.lock b/Cargo.lock index de181f962..08ed822a2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -893,6 +893,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.9.2" @@ -1072,6 +1093,16 @@ dependencies = [ "dirs-sys", ] +[[package]] +name = "dirs-next" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b98cf8ebf19c3d1b223e151f99a4f9f0690dca41414773390fc824184ac833e1" +dependencies = [ + "cfg-if", + "dirs-sys-next", +] + [[package]] name = "dirs-sys" version = "0.4.1" @@ -1084,6 +1115,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "dirs-sys-next" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ebda144c4fe02d1f7ea1a7d9641b6fc6b580adcfa024ae48797ecdeb6825b4d" +dependencies = [ + "libc", + "redox_users", + "winapi", +] + [[package]] name = "dotenvy" version = "0.15.7" @@ -1099,6 +1141,12 @@ dependencies = [ "serde", ] +[[package]] +name = "encode_unicode" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34aa73646ffb006b8f5147f3dc182bd4bcb190227ce861fc4a4844bf8e3cb2c0" + [[package]] name = "encoding_rs" version = "0.8.34" @@ -1256,6 +1304,7 @@ dependencies = [ "glob", "http 1.1.0", "pico-args", + "prettytable-rs", "rand", "reqwest", "semver", @@ -2123,6 +2172,17 @@ version = "2.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ddc24109865250148c2e0f3d25d4f0f479571723792d3802153c60922a4fb708" +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.1" @@ -3104,6 +3164,20 @@ dependencies = [ "zerocopy", ] +[[package]] +name = "prettytable-rs" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eea25e07510aa6ab6547308ebe3c036016d162b8da920dbb079e3ba8acf3d95a" +dependencies = [ + "csv", + "encode_unicode", + "is-terminal", + "lazy_static", + "term", + "unicode-width", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -4326,6 +4400,17 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "term" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c59df8ac95d96ff9bede18eb7300b0fda5e5d8d90960e76f8e14ae765eedbf1f" +dependencies = [ + "dirs-next", + "rustversion", + "winapi", +] + [[package]] name = "thiserror" version = "1.0.64" diff --git a/crates/core/src/client_events.rs b/crates/core/src/client_events.rs index 674995f01..59213147e 100644 --- a/crates/core/src/client_events.rs +++ b/crates/core/src/client_events.rs @@ -33,11 +33,6 @@ static CLIENT_ID: AtomicUsize = AtomicUsize::new(1); impl ClientId { pub const FIRST: Self = ClientId(0); - #[cfg(test)] - pub(crate) const fn new(id: usize) -> ClientId { - Self(id) - } - pub fn next() -> Self { ClientId(CLIENT_ID.fetch_add(1, std::sync::atomic::Ordering::SeqCst)) } diff --git a/crates/core/src/client_events/combinator.rs b/crates/core/src/client_events/combinator.rs index 145d4b858..f80219361 100644 --- a/crates/core/src/client_events/combinator.rs +++ b/crates/core/src/client_events/combinator.rs @@ -1,36 +1,33 @@ -use std::future::Future; -use std::pin::Pin; -use std::task::Context; -use std::{collections::HashMap, task::Poll}; +use std::collections::HashMap; use freenet_stdlib::client_api::{ErrorKind, HostResponse}; use futures::future::BoxFuture; -use futures::task::AtomicWaker; -use futures::FutureExt; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, StreamExt}; use tokio::sync::mpsc::{channel, Receiver, Sender}; use super::{BoxedClient, ClientError, ClientId, HostResult, OpenRequest}; type HostIncomingMsg = Result, ClientError>; +type ClientEventsFut = + BoxFuture<'static, (usize, Receiver, Option)>; + /// This type allows combining different sources of events into one and interoperation between them. pub struct ClientEventsCombinator { + pending_futs: FuturesUnordered, /// receiving end of the different client applications from the node clients: [Sender<(ClientId, HostResult)>; N], - /// receiving end of the host node from the different client applications - hosts_rx: [Receiver; N], /// a map of the individual protocols, external, sending client events ids to an internal list of ids external_clients: [HashMap; N], /// a map of the external id to which protocol it belongs (represented by the index in the array) /// and the original id (reverse of indexes) internal_clients: HashMap, - #[allow(clippy::type_complexity)] - pend_futs: - [Option> + Sync + Send + 'static>>>; N], } impl ClientEventsCombinator { pub fn new(clients: [BoxedClient; N]) -> Self { + let pending_futs = FuturesUnordered::new(); let channels = clients.map(|client| { let (tx, rx) = channel(1); let (tx_host, rx_host) = channel(1); @@ -43,49 +40,37 @@ impl ClientEventsCombinator { clients[i] = Some(tx); hosts_rx[i] = Some(rx_host); } - let hosts_rx = hosts_rx.map(|h| h.unwrap()); let external_clients = [(); N].map(|_| HashMap::new()); + + for (i, rx) in hosts_rx.iter_mut().enumerate() { + let Some(mut rx) = rx.take() else { + continue; + }; + pending_futs.push( + async move { + let res = rx.recv().await; + (i, rx, res) + } + .boxed(), + ); + } + Self { clients: clients.map(|c| c.unwrap()), - hosts_rx, external_clients, internal_clients: HashMap::new(), - pend_futs: [(); N].map(|_| None), + pending_futs, } } } impl super::ClientEventsProxy for ClientEventsCombinator { - fn recv<'a>(&'_ mut self) -> BoxFuture<'_, Result, ClientError>> { - Box::pin(async { - let mut futs_opt = [(); N].map(|_| None); - let pend_futs = &mut self.pend_futs; - for (i, pend) in pend_futs.iter_mut().enumerate() { - let fut = &mut futs_opt[i]; - if let Some(pend_fut) = pend.take() { - *fut = Some(pend_fut); - } else { - // this receiver ain't awaiting, queue a new one - // SAFETY: is safe here to extend the lifetime since clients are required to be 'static - // and we take ownership, so they will be alive for the duration of the program - let f = Box::pin(self.hosts_rx[i].recv()) - as Pin + Send + Sync + '_>>; + fn recv(&mut self) -> BoxFuture<'_, Result, ClientError>> { + async { + let Some((idx, mut rx, res)) = self.pending_futs.next().await else { + unreachable!(); + }; - type ExtendedLife<'a, 'b> = Pin< - Box< - dyn Future, ClientError>>> - + Send - + Sync - + 'b, - >, - >; - let new_pend = unsafe { - std::mem::transmute::, ExtendedLife<'_, '_>>(f) - }; - *fut = Some(new_pend); - } - } - let (res, idx, mut others) = select_all(futs_opt.map(|f| f.unwrap())).await; let res = res .map(|res| { match res { @@ -95,19 +80,15 @@ impl super::ClientEventsProxy for ClientEventsCombinator { notification_channel, token, }) => { - tracing::debug!( - "received request; internal_id={external}; req={request}" - ); - let id = - *self.external_clients[idx] - .entry(external) - .or_insert_with(|| { - // add a new mapped external client id - let internal = ClientId::next(); - self.internal_clients.insert(internal, (idx, external)); - internal - }); - + let id = *self.external_clients[idx] + .entry(external) + .or_insert_with(|| { + // add a new mapped external client id + let internal = ClientId::next(); + self.internal_clients.insert(internal, (idx, external)); + internal + }); + tracing::debug!("received request for proxy #{idx}; internal_id={id}; external_id={external}; req={request}"); Ok(OpenRequest { client_id: id, request, @@ -119,15 +100,18 @@ impl super::ClientEventsProxy for ClientEventsCombinator { } }) .unwrap_or_else(|| Err(ErrorKind::TransportProtocolDisconnect.into())); - // place back futs - debug_assert!(pend_futs.iter().all(|f| f.is_none())); - debug_assert_eq!( - others.iter().filter(|a| a.is_some()).count(), - pend_futs.len() - 1 + + self.pending_futs.push( + async move { + let res = rx.recv().await; + (idx, rx, res) + } + .boxed(), ); - std::mem::swap(pend_futs, &mut others); + res - }) + } + .boxed() } fn send<'a>( @@ -135,7 +119,7 @@ impl super::ClientEventsProxy for ClientEventsCombinator { internal: ClientId, response: Result, ) -> BoxFuture<'_, Result<(), ClientError>> { - Box::pin(async move { + async move { let (idx, external) = self .internal_clients .get(&internal) @@ -145,7 +129,8 @@ impl super::ClientEventsProxy for ClientEventsCombinator { .await .map_err(|_| ErrorKind::TransportProtocolDisconnect)?; Ok(()) - }) + } + .boxed() } } @@ -189,62 +174,10 @@ async fn client_fn( tracing::error!("Client shut down"); } -/// An optimized for the use case version of `futures::select_all` which keeps ordering. -#[must_use = "futures do nothing unless you `.await` or poll them"] -struct SelectAll { - waker: AtomicWaker, - inner: [Option; N], -} - -impl Unpin for SelectAll {} - -impl Future for SelectAll { - type Output = (Fut::Output, usize, [Option; N]); - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - macro_rules! recv { - () => { - let item = self - .inner - .iter_mut() - .enumerate() - .find_map(|(i, f)| { - f.as_mut().map(|f| match f.poll_unpin(cx) { - Poll::Pending => None, - Poll::Ready(e) => Some((i, e)), - }) - }) - .flatten(); - match item { - Some((idx, res)) => { - self.inner[idx] = None; - let rest = std::mem::replace(&mut self.inner, [(); N].map(|_| None)); - return Poll::Ready((res, idx, rest)); - } - None => {} - } - }; - } - recv!(); - self.waker.register(cx.waker()); - recv!(); - Poll::Pending - } -} - -fn select_all(iter: [F; N]) -> SelectAll -where - F: Future + Unpin, -{ - SelectAll { - waker: AtomicWaker::new(), - inner: iter.map(|f| Some(f)), - } -} - #[cfg(test)] mod test { use freenet_stdlib::client_api::ClientRequest; + use futures::try_join; use super::*; use crate::client_events::ClientEventsProxy; @@ -252,11 +185,12 @@ mod test { struct SampleProxy { id: usize, rx: Receiver, + tx: Sender, } impl SampleProxy { - fn new(id: usize, rx: Receiver) -> Self { - Self { id, rx } + fn new(id: usize, rx: Receiver, tx: Sender) -> Self { + Self { id, rx, tx } } } @@ -269,9 +203,8 @@ mod test { .await .ok_or_else::(|| ErrorKind::ChannelClosed.into())?; assert_eq!(id, self.id); - eprintln!("#{}, received msg {id}", self.id); Ok(OpenRequest::new( - ClientId::new(id), + ClientId::next(), Box::new(ClientRequest::Disconnect { cause: None }), )) }) @@ -282,38 +215,108 @@ mod test { _id: ClientId, _response: Result, ) -> BoxFuture<'_, Result<(), ClientError>> { - todo!() + async { + self.tx + .send(self.id) + .await + .map_err(|_| ErrorKind::ChannelClosed.into()) + } + .boxed() } } - #[ignore] - #[tokio::test] - async fn combinator_recv() { + fn setup_proxies() -> ([BoxedClient; 3], Vec>, Vec>) { let mut cnt = 0; let mut senders = vec![]; - let proxies = [None::<()>; 3].map(|_| { - let (tx, rx) = channel(1); - senders.push(tx); - let r = Box::new(SampleProxy::new(cnt, rx)) as _; + let mut receivers = vec![]; + let clients = [None::<()>; 3].map(|_| { + let (tx1, rx1) = channel(1); + let (tx2, rx2) = channel(1); + let r = Box::new(SampleProxy::new(cnt, rx1, tx2)) as _; + senders.push(tx1); + receivers.push(rx2); cnt += 1; r }); + (clients, senders, receivers) + } + + #[tokio::test] + async fn test_recv() { + let (proxies, mut senders, _) = setup_proxies(); let mut combinator = ClientEventsCombinator::new(proxies); - let _senders = tokio::task::spawn(async move { - for (id, tx) in senders.iter_mut().enumerate() { - tx.send(id).await.unwrap(); - eprintln!("sent msg {id}"); + let sending = async { + for _ in 1..4 { + for (id, tx) in senders.iter_mut().enumerate() { + tx.send(id).await?; + } + } + Ok::<_, Box>(senders) + }; + + let combinator = async { + let client_ids = combinator + .internal_clients + .keys() + .cloned() + .collect::>(); + for _ in 0..3 { + for id in client_ids.iter() { + let OpenRequest { + client_id: req_id, .. + } = combinator.recv().await?; + assert_eq!(*id, req_id); + } } - senders - }) - .await - .unwrap(); + Ok::<_, Box>(()) + }; - for i in 0..3 { - let OpenRequest { client_id: id, .. } = combinator.recv().await.unwrap(); - eprintln!("received: {id:?}"); - assert_eq!(ClientId::new(i), id); + try_join!(sending, combinator).unwrap(); + } + + #[tokio::test] + async fn test_send() { + let (proxies, mut senders, mut receivers) = setup_proxies(); + let mut combinator = ClientEventsCombinator::new(proxies); + + // Create the internal client mapping implicitly. + for (idx, sender) in senders.iter_mut().enumerate() { + sender.send(idx).await.unwrap(); + combinator.recv().await.unwrap(); } + + let receiving = async { + // Test sending a response through the combinator for each proxy. + for (idx, receiver) in receivers.iter_mut().enumerate() { + // Assert that the receiver received the expected message. + let received_id = receiver + .recv() + .await + .ok_or(format!("missing {idx} sender"))?; + assert_eq!(received_id, idx); + } + Ok::<_, Box>(()) + }; + + let sending = async { + for (i, cli_id) in combinator + .internal_clients + .keys() + .cloned() + .collect::>() + .into_iter() + .enumerate() + { + // Send a sample response through the combinator. + combinator + .send(cli_id, Ok(HostResponse::Ok)) + .await + .map_err(|err| format!("Send failed for client {i}: {err}",))?; + } + Ok::<_, Box>(()) + }; + + try_join!(sending, receiving).unwrap(); } } diff --git a/crates/core/src/client_events/websocket.rs b/crates/core/src/client_events/websocket.rs index c84d43ba9..e54549645 100644 --- a/crates/core/src/client_events/websocket.rs +++ b/crates/core/src/client_events/websocket.rs @@ -211,6 +211,7 @@ async fn websocket_commands( Extension(rs): Extension, ) -> axum::response::Response { let on_upgrade = move |ws: WebSocket| async move { + tracing::debug!(protoc = ?ws.protocol(), "websocket connection established"); if let Err(error) = websocket_interface(rs.clone(), auth_token, encoding_protoc, ws).await { tracing::error!("{error}"); } diff --git a/crates/core/src/message.rs b/crates/core/src/message.rs index c915624d7..e55dd7717 100644 --- a/crates/core/src/message.rs +++ b/crates/core/src/message.rs @@ -313,6 +313,13 @@ pub(crate) enum NodeEvent { Disconnect { cause: Option>, }, + QueryConnections { + callback: tokio::sync::mpsc::Sender, + }, +} + +pub(crate) enum QueryResult { + Connections(Vec), } impl Display for NodeEvent { @@ -330,6 +337,9 @@ impl Display for NodeEvent { NodeEvent::Disconnect { cause: None } => { write!(f, "Disconnect node, reason: unknown") } + NodeEvent::QueryConnections { .. } => { + write!(f, "QueryConnections") + } } } } diff --git a/crates/core/src/node.rs b/crates/core/src/node.rs index 7fd7bb6f6..ce655cd5e 100644 --- a/crates/core/src/node.rs +++ b/crates/core/src/node.rs @@ -22,12 +22,14 @@ use std::{ use anyhow::Context; use either::Either; use freenet_stdlib::{ - client_api::{ClientRequest, ContractRequest, ErrorKind}, + client_api::{ClientRequest, ContractRequest, ErrorKind, HostResponse, QueryResponse}, prelude::{ContractKey, RelatedContracts, WrappedState}, }; +use futures::{stream::FuturesUnordered, StreamExt}; use rsa::pkcs8::DecodePublicKey; use serde::{Deserialize, Serialize}; +use tokio::sync::mpsc; use tracing::Instrument; use self::p2p_impl::NodeP2P; @@ -39,7 +41,7 @@ use crate::{ ExecutorToEventLoopChannel, NetworkContractHandler, }, local_node::Executor, - message::{NetMessage, NodeEvent, Transaction, TransactionType}, + message::{NetMessage, NodeEvent, QueryResult, Transaction, TransactionType}, operations::{ connect::{self, ConnectOp}, get, put, subscribe, update, OpEnum, OpError, OpOutcome, @@ -365,6 +367,7 @@ async fn client_event_handling( ) where ClientEv: ClientEventsProxy + Send + 'static, { + let mut callbacks = FuturesUnordered::new(); loop { tokio::select! { client_request = client_events.recv() => { @@ -387,7 +390,10 @@ async fn client_event_handling( node_controller.send(NodeEvent::Disconnect { cause: cause.clone() }).await.ok(); break; } - process_open_request(req, op_manager.clone()).await; + let cli_id = req.client_id; + if let Some(mut cb) = process_open_request(req, op_manager.clone()).await { + callbacks.push(async move { cb.recv().await.map(|r| (cli_id, r)) }); + } } res = client_responses.recv() => { if let Some((cli_id, res)) = res { @@ -400,12 +406,33 @@ async fn client_event_handling( } } } + res = callbacks.next(), if !callbacks.is_empty() => { + if let Some(Some((cli_id, res))) = res { + let QueryResult::Connections(conns) = res; + let res = Ok(HostResponse::QueryResponse(QueryResponse::ConnectedPeers { + peers: conns.into_iter().map(|p| (p.pub_key.to_string(), p.addr)).collect() } + )); + if let Err(err) = client_events.send(cli_id, res).await { + tracing::debug!("channel closed: {err}"); + break; + } + } + } } } } #[inline] -async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc) { +async fn process_open_request( + request: OpenRequest<'static>, + op_manager: Arc, +) -> Option> { + let (callback_tx, callback_rx) = if matches!(&*request.request, ClientRequest::NodeQueries(_)) { + let (tx, rx) = mpsc::channel(1); + (Some(tx), Some(rx)) + } else { + (None, None) + }; // this will indirectly start actions on the local contract executor let fut = async move { let client_id = request.client_id; @@ -506,6 +533,14 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc todo!("FIXME: delegate op"), ClientRequest::Disconnect { .. } => unreachable!(), + ClientRequest::NodeQueries(_) => { + tracing::debug!("Received node queries from user event"); + let _ = op_manager + .notify_node_event(NodeEvent::QueryConnections { + callback: callback_tx.expect("should be set"), + }) + .await; + } _ => { tracing::error!("Op not supported"); } @@ -514,6 +549,7 @@ async fn process_open_request(request: OpenRequest<'static>, op_manager: Arc { + let connections = self.connections.keys().cloned().collect(); + callback.send(QueryResult::Connections(connections)).await?; + } NodeEvent::Disconnect { cause } => { tracing::info!( "Disconnecting from network{}", diff --git a/crates/core/src/node/testing_impl.rs b/crates/core/src/node/testing_impl.rs index be8c9ad81..89352d7ac 100644 --- a/crates/core/src/node/testing_impl.rs +++ b/crates/core/src/node/testing_impl.rs @@ -888,6 +888,9 @@ where tracing::info!(peer = %peer_key, "Shutting down node"); return Ok(()); } + NodeEvent::QueryConnections { .. } => { + unimplemented!() + } }, Err(err) => { super::report_result( diff --git a/crates/fdev/Cargo.toml b/crates/fdev/Cargo.toml index 2cedf1029..18bce56f6 100644 --- a/crates/fdev/Cargo.toml +++ b/crates/fdev/Cargo.toml @@ -20,6 +20,7 @@ either = { workspace = true } futures = { workspace = true } glob = "0.3" pico-args = "0.5" +prettytable-rs = "0.10" rand = { workspace = true } serde = "1" serde_json = "1" diff --git a/crates/fdev/src/commands.rs b/crates/fdev/src/commands.rs index e906aa3fd..9b8133d49 100644 --- a/crates/fdev/src/commands.rs +++ b/crates/fdev/src/commands.rs @@ -1,9 +1,4 @@ -use std::{ - fs::File, - io::Read, - net::{IpAddr, SocketAddr}, - path::PathBuf, -}; +use std::{fs::File, io::Read, net::SocketAddr, path::PathBuf}; use freenet::dev_tool::OperationMode; use freenet_stdlib::{ @@ -88,7 +83,8 @@ async fn put_contract( related_contracts, } .into(); - execute_command(request, other, config.address, config.port).await + let mut client = start_api_client(other).await?; + execute_command(request, &mut client).await } async fn put_delegate( @@ -127,7 +123,8 @@ For additional hardening is recommended to use a different cipher and nonce to e nonce, } .into(); - execute_command(request, other, config.address, config.port).await + let mut client = start_api_client(other).await?; + execute_command(request, &mut client).await } pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<()> { @@ -142,14 +139,17 @@ pub async fn update(config: UpdateConfig, other: BaseConfig) -> anyhow::Result<( StateDelta::from(buf).into() }; let request = ContractRequest::Update { key, data }.into(); - execute_command(request, other, config.address, config.port).await + let mut client = start_api_client(other).await?; + execute_command(request, &mut client).await +} + +pub(crate) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result { + v1::start_api_client(cfg).await } -async fn execute_command( +pub(crate) async fn execute_command( request: ClientRequest<'static>, - other: BaseConfig, - address: IpAddr, - port: u16, + api_client: &mut WebApi, ) -> anyhow::Result<()> { - v1::execute_command(request, other, address, port).await + v1::execute_command(request, api_client).await } diff --git a/crates/fdev/src/commands/v1.rs b/crates/fdev/src/commands/v1.rs index 73eec9c05..f9970718f 100644 --- a/crates/fdev/src/commands/v1.rs +++ b/crates/fdev/src/commands/v1.rs @@ -1,13 +1,8 @@ use super::*; -pub(super) async fn execute_command( - request: ClientRequest<'static>, - other: BaseConfig, - address: IpAddr, - port: u16, -) -> anyhow::Result<()> { - let mode = other.mode; - +pub(super) async fn start_api_client(cfg: BaseConfig) -> anyhow::Result { + let mode = cfg.mode; + let address = cfg.address; let target = match mode { OperationMode::Local => { if !address.is_loopback() { @@ -15,9 +10,9 @@ pub(super) async fn execute_command( "invalid ip: {address}, expecting a loopback ip address in local mode" )); } - SocketAddr::new(address, port) + SocketAddr::new(address, cfg.port) } - OperationMode::Network => SocketAddr::new(address, port), + OperationMode::Network => SocketAddr::new(address, cfg.port), }; let (stream, _) = tokio_tungstenite::connect_async(&format!( @@ -30,8 +25,13 @@ pub(super) async fn execute_command( anyhow::anyhow!(format!("fail to connect to the host({target}): {e}")) })?; - WebApi::start(stream) - .send(request) - .await - .map_err(Into::into) + Ok(WebApi::start(stream)) +} + +pub(super) async fn execute_command( + request: ClientRequest<'static>, + api_client: &mut WebApi, +) -> anyhow::Result<()> { + api_client.send(request).await?; + Ok(()) } diff --git a/crates/fdev/src/config.rs b/crates/fdev/src/config.rs index 9eb107815..10e06bc84 100644 --- a/crates/fdev/src/config.rs +++ b/crates/fdev/src/config.rs @@ -27,6 +27,13 @@ pub struct BaseConfig { /// Node operation mode. #[arg(value_enum, default_value_t=OperationMode::Local, env = "MODE")] pub mode: OperationMode, + /// The port of the running local freenet node websocket API. + #[arg(short, long, default_value = "50509", env = "WS_API_PORT")] + pub(crate) port: u16, + /// The ip address of freenet node to publish the contract to. If the node is running in local mode, + /// The default value is `127.0.0.1`. + #[arg(short, long, default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] + pub(crate) address: IpAddr, } #[derive(clap::Subcommand, Clone)] @@ -35,6 +42,8 @@ pub enum SubCommand { Build(BuildToolConfig), Inspect(crate::inspect::InspectConfig), Publish(PutConfig), + /// Query the local node for information. Currently only shows open connections. + Query {}, WasmRuntime(ExecutorConfig), Execute(RunCliConfig), Test(crate::testing::TestConfig), @@ -78,9 +87,6 @@ pub struct UpdateConfig { /// The default value is `127.0.0.1` #[arg(short, long, default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] pub(crate) address: IpAddr, - /// The port of the running local freenet node. - #[arg(short, long, default_value = "50509")] - pub(crate) port: u16, /// A path to the update/delta being pushed to the contract. pub(crate) delta: PathBuf, /// Whether this contract will be updated in the network or is just a dry run @@ -97,15 +103,6 @@ pub struct PutConfig { #[arg(long)] pub(crate) code: PathBuf, - /// The ip address of freenet node to publish the contract to. If the node is running in local mode, - /// The default value is `127.0.0.1`. - #[arg(short, long, default_value_t = IpAddr::V4(Ipv4Addr::LOCALHOST))] - pub(crate) address: IpAddr, - - /// The port of the running local freenet node. - #[arg(short, long, default_value = "50509")] - pub(crate) port: u16, - /// A path to the file parameters for the contract/delegate. If not specified, will be published /// with empty parameters. #[arg(long)] diff --git a/crates/fdev/src/main.rs b/crates/fdev/src/main.rs index 3d24f4c06..aa8107c8c 100644 --- a/crates/fdev/src/main.rs +++ b/crates/fdev/src/main.rs @@ -9,6 +9,7 @@ mod config; mod inspect; pub(crate) mod network_metrics_server; mod new_package; +mod query; mod testing; mod util; mod wasm_runtime; @@ -66,6 +67,10 @@ fn main() -> anyhow::Result<()> { } Ok(()) } + SubCommand::Query {} => { + query::query(config.additional).await?; + Ok(()) + } }; // todo: make all commands return concrete `thiserror` compatible errors so we can use anyhow r.map_err(|e| anyhow::format_err!(e)) diff --git a/crates/fdev/src/query.rs b/crates/fdev/src/query.rs new file mode 100644 index 000000000..d1f946126 --- /dev/null +++ b/crates/fdev/src/query.rs @@ -0,0 +1,40 @@ +use freenet_stdlib::client_api::{ConnectedPeers, HostResponse, QueryResponse}; +use prettytable::{Cell, Row, Table}; + +use crate::{ + commands::{execute_command, start_api_client}, + config::BaseConfig, +}; + +pub async fn query(base_cfg: BaseConfig) -> anyhow::Result<()> { + let mut client = start_api_client(base_cfg).await?; + tracing::info!("Querying for connected peers"); + execute_command( + freenet_stdlib::client_api::ClientRequest::NodeQueries(ConnectedPeers {}), + &mut client, + ) + .await?; + let HostResponse::QueryResponse(QueryResponse::ConnectedPeers { peers }) = + client.recv().await? + else { + anyhow::bail!("Unexpected response from the host"); + }; + + let mut table = Table::new(); + + table.add_row(Row::new(vec![ + Cell::new("Identifier"), + Cell::new("SocketAddress"), + ])); + + for (identifier, socketaddress) in peers { + table.add_row(Row::new(vec![ + Cell::new(&identifier.to_string()), + Cell::new(&socketaddress.to_string()), + ])); + } + + table.printstd(); + + Ok(()) +} diff --git a/stdlib b/stdlib index aa5c7b784..e0769b543 160000 --- a/stdlib +++ b/stdlib @@ -1 +1 @@ -Subproject commit aa5c7b78454d19d3355295e58e622db05df7d3e7 +Subproject commit e0769b543a2006914f7ca9a592963cb47c437b42