diff --git a/Cargo.lock b/Cargo.lock index e6e32b83..245f8fae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -530,12 +530,12 @@ dependencies = [ "hex", "indexmap", "itertools 0.13.0", - "log", "rand", "rand_chacha", "thiserror", "tokio", "tokio-tungstenite", + "tracing", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 53ae5800..e41ef159 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -122,7 +122,6 @@ tungstenite = "0.21.0" native-tls = "0.2.11" rustls = "0.22.0" rustls-pemfile = "2.1.3" -log = "0.4.21" flate2 = "1.0.30" once_cell = "1.19.0" num-bigint = "0.4.6" diff --git a/crates/chia-sdk-test/Cargo.toml b/crates/chia-sdk-test/Cargo.toml index a8044576..8b76582b 100644 --- a/crates/chia-sdk-test/Cargo.toml +++ b/crates/chia-sdk-test/Cargo.toml @@ -27,7 +27,7 @@ futures-channel = { workspace = true, features = ["sink"] } futures-util = { workspace = true } indexmap = { workspace = true } thiserror = { workspace = true } -log = { workspace = true } +tracing = { workspace = true } itertools = { workspace = true } rand = { workspace = true } rand_chacha = { workspace = true } diff --git a/crates/chia-sdk-test/src/peer_simulator.rs b/crates/chia-sdk-test/src/peer_simulator.rs index ecfd75e4..b7022dbf 100644 --- a/crates/chia-sdk-test/src/peer_simulator.rs +++ b/crates/chia-sdk-test/src/peer_simulator.rs @@ -37,7 +37,7 @@ impl PeerSimulator { } pub async fn with_config(config: SimulatorConfig) -> Result { - log::info!("starting simulator"); + tracing::info!("starting simulator"); let addr = "127.0.0.1:0"; let peer_map = PeerMap::default(); @@ -60,7 +60,7 @@ impl PeerSimulator { let stream = match tokio_tungstenite::accept_async(stream).await { Ok(stream) => stream, Err(error) => { - log::error!("error accepting websocket connection: {}", error); + tracing::error!("error accepting websocket connection: {}", error); continue; } }; @@ -88,10 +88,8 @@ impl PeerSimulator { &self.config } - pub async fn connect_split( - &self, - ) -> Result<(Peer, mpsc::Receiver), PeerSimulatorError> { - log::info!("connecting new peer to simulator"); + pub async fn connect_raw(&self) -> Result<(Peer, mpsc::Receiver), PeerSimulatorError> { + tracing::info!("connecting new peer to simulator"); let (ws, _) = connect_async(format!("ws://{}", self.addr)).await?; Ok(Peer::from_websocket( ws, @@ -101,12 +99,23 @@ impl PeerSimulator { )?) } + pub async fn connect_split( + &self, + ) -> Result<(Peer, mpsc::Receiver), PeerSimulatorError> { + let (peer, mut receiver) = self.connect_raw().await?; + receiver + .recv() + .await + .expect("expected NewPeakWallet message"); + Ok((peer, receiver)) + } + pub async fn connect(&self) -> Result { let (peer, mut receiver) = self.connect_split().await?; tokio::spawn(async move { while let Some(message) = receiver.recv().await { - log::debug!("received message: {message:?}"); + tracing::debug!("received message: {message:?}"); } }); diff --git a/crates/chia-sdk-test/src/peer_simulator/simulator_config.rs b/crates/chia-sdk-test/src/peer_simulator/simulator_config.rs index 61496cca..34289003 100644 --- a/crates/chia-sdk-test/src/peer_simulator/simulator_config.rs +++ b/crates/chia-sdk-test/src/peer_simulator/simulator_config.rs @@ -1,5 +1,5 @@ use chia_consensus::consensus_constants::ConsensusConstants; -use chia_sdk_types::MAINNET_CONSTANTS; +use chia_sdk_types::TESTNET11_CONSTANTS; #[derive(Debug, Clone, PartialEq, Eq)] pub struct SimulatorConfig { @@ -12,7 +12,7 @@ pub struct SimulatorConfig { impl Default for SimulatorConfig { fn default() -> Self { Self { - constants: MAINNET_CONSTANTS.clone(), + constants: TESTNET11_CONSTANTS.clone(), max_subscriptions: 200_000, max_response_coins: 100_000, puzzle_state_batch_size: 30_000, diff --git a/crates/chia-sdk-test/src/peer_simulator/subscriptions.rs b/crates/chia-sdk-test/src/peer_simulator/subscriptions.rs index e00c02f3..36362e57 100644 --- a/crates/chia-sdk-test/src/peer_simulator/subscriptions.rs +++ b/crates/chia-sdk-test/src/peer_simulator/subscriptions.rs @@ -28,6 +28,64 @@ impl Subscriptions { .extend(puzzle_hashes); } + pub(crate) fn remove_coin_subscriptions( + &mut self, + peer: SocketAddr, + coin_ids: &[Bytes32], + ) -> Vec { + let mut removed = Vec::new(); + + if let Some(subscriptions) = self.coin_subscriptions.get_mut(&peer) { + for coin_id in coin_ids { + if subscriptions.swap_remove(coin_id) { + removed.push(*coin_id); + } + } + if subscriptions.is_empty() { + self.coin_subscriptions.swap_remove(&peer); + } + } + + removed + } + + pub(crate) fn remove_puzzle_subscriptions( + &mut self, + peer: SocketAddr, + puzzle_hashes: &[Bytes32], + ) -> Vec { + let mut removed = Vec::new(); + + if let Some(subscriptions) = self.puzzle_subscriptions.get_mut(&peer) { + for puzzle_hash in puzzle_hashes { + if subscriptions.swap_remove(puzzle_hash) { + removed.push(*puzzle_hash); + } + } + if subscriptions.is_empty() { + self.puzzle_subscriptions.swap_remove(&peer); + } + } + + removed + } + + pub(crate) fn remove_all_coin_subscriptions(&mut self, peer: SocketAddr) -> Vec { + self.coin_subscriptions + .swap_remove(&peer) + .unwrap_or_default() + .into_iter() + .collect() + } + + pub(crate) fn remove_all_puzzle_subscriptions(&mut self, peer: SocketAddr) -> Vec { + self.puzzle_subscriptions + .swap_remove(&peer) + .unwrap_or_default() + .into_iter() + .collect() + } + pub(crate) fn subscription_count(&self, peer: SocketAddr) -> usize { self.coin_subscriptions.get(&peer).map_or(0, IndexSet::len) + self diff --git a/crates/chia-sdk-test/src/peer_simulator/ws_connection.rs b/crates/chia-sdk-test/src/peer_simulator/ws_connection.rs index c99c1483..9ef60f4d 100644 --- a/crates/chia-sdk-test/src/peer_simulator/ws_connection.rs +++ b/crates/chia-sdk-test/src/peer_simulator/ws_connection.rs @@ -5,13 +5,14 @@ use chia_protocol::{ Bytes, Bytes32, CoinState, CoinStateUpdate, Message, NewPeakWallet, ProtocolMessageTypes, PuzzleSolutionResponse, RegisterForCoinUpdates, RegisterForPhUpdates, RejectCoinState, RejectPuzzleSolution, RejectPuzzleState, RejectStateReason, RequestChildren, RequestCoinState, - RequestPuzzleSolution, RequestPuzzleState, RespondChildren, RespondCoinState, - RespondPuzzleSolution, RespondPuzzleState, RespondToCoinUpdates, RespondToPhUpdates, - SendTransaction, SpendBundle, TransactionAck, + RequestPuzzleSolution, RequestPuzzleState, RequestRemoveCoinSubscriptions, + RequestRemovePuzzleSubscriptions, RespondChildren, RespondCoinState, RespondPuzzleSolution, + RespondPuzzleState, RespondRemoveCoinSubscriptions, RespondRemovePuzzleSubscriptions, + RespondToCoinUpdates, RespondToPhUpdates, SendTransaction, SpendBundle, TransactionAck, }; use chia_traits::Streamable; use clvmr::NodePtr; -use futures_channel::mpsc; +use futures_channel::mpsc::{self, UnboundedSender}; use futures_util::{SinkExt, StreamExt}; use indexmap::{IndexMap, IndexSet}; use itertools::Itertools; @@ -19,7 +20,10 @@ use tokio::{ net::TcpStream, sync::{Mutex, MutexGuard}, }; -use tokio_tungstenite::{tungstenite::Message as WsMessage, WebSocketStream}; +use tokio_tungstenite::{ + tungstenite::{self, Message as WsMessage}, + WebSocketStream, +}; use crate::{Simulator, SimulatorError}; @@ -36,7 +40,13 @@ pub(crate) async fn ws_connection( simulator: Arc>, subscriptions: Arc>, ) { - let (tx, mut rx) = mpsc::unbounded(); + let (mut tx, mut rx) = mpsc::unbounded(); + + if let Err(error) = handle_initial_peak(&mut tx, &simulator).await { + tracing::error!("error sending initial peak: {}", error); + return; + } + peer_map.insert(addr, tx.clone()).await; let (mut sink, mut stream) = ws.split(); @@ -44,7 +54,7 @@ pub(crate) async fn ws_connection( tokio::spawn(async move { while let Some(message) = rx.next().await { if let Err(error) = sink.send(message).await { - log::error!("error sending message to peer: {}", error); + tracing::error!("error sending message to peer: {}", error); continue; } } @@ -54,7 +64,7 @@ pub(crate) async fn ws_connection( let message = match message { Ok(message) => message, Err(error) => { - log::info!("received error from stream: {:?}", error); + tracing::info!("received error from stream: {:?}", error); break; } }; @@ -70,7 +80,7 @@ pub(crate) async fn ws_connection( ) .await { - log::error!("error handling message: {}", error); + tracing::error!("error handling message: {}", error); break; } } @@ -78,6 +88,32 @@ pub(crate) async fn ws_connection( peer_map.remove(addr).await; } +async fn handle_initial_peak( + tx: &mut UnboundedSender, + sim: &Mutex, +) -> Result<(), PeerSimulatorError> { + let (header_hash, height) = { + let sim = sim.lock().await; + (sim.header_hash(), sim.height()) + }; + + tx.send( + Message { + msg_type: ProtocolMessageTypes::NewPeakWallet, + id: None, + data: NewPeakWallet::new(header_hash, height, 0, height) + .to_bytes() + .unwrap() + .into(), + } + .to_bytes()? + .into(), + ) + .await?; + + Ok(()) +} + async fn handle_message( peer_map: PeerMap, config: &SimulatorConfig, @@ -131,6 +167,24 @@ async fn handle_message( let response = request_puzzle_state(addr, request, config, &simulator, subscriptions)?; (ProtocolMessageTypes::RespondPuzzleState, response) } + ProtocolMessageTypes::RequestRemoveCoinSubscriptions => { + let request = RequestRemoveCoinSubscriptions::from_bytes(&request.data)?; + let mut subscriptions = subscriptions.lock().await; + let response = request_remove_coin_subscriptions(addr, request, &mut subscriptions)?; + ( + ProtocolMessageTypes::RespondRemoveCoinSubscriptions, + response, + ) + } + ProtocolMessageTypes::RequestRemovePuzzleSubscriptions => { + let request = RequestRemovePuzzleSubscriptions::from_bytes(&request.data)?; + let mut subscriptions = subscriptions.lock().await; + let response = request_remove_puzzle_subscriptions(addr, request, &mut subscriptions)?; + ( + ProtocolMessageTypes::RespondRemovePuzzleSubscriptions, + response, + ) + } message_type => { return Err(PeerSimulatorError::UnsupportedMessage(message_type)); } @@ -215,7 +269,7 @@ async fn send_transaction( let updates = match new_transaction(&mut simulator, &mut subscriptions, request.transaction) { Ok(updates) => updates, Err(error) => { - log::error!("error processing transaction: {:?}", &error); + tracing::error!("error processing transaction: {:?}", &error); let error_code = match error { PeerSimulatorError::Simulator(SimulatorError::Validation(error_code)) => error_code, @@ -246,7 +300,7 @@ async fn send_transaction( // Send updates to peers. for (addr, mut peer) in peer_map.peers().await { - peer.send(new_peak.clone().into()).await.unwrap(); + peer.send(new_peak.clone().into()).await?; let Some(peer_updates) = updates.get(&addr).cloned() else { continue; @@ -444,12 +498,12 @@ fn request_puzzle_state( ) -> Result { if let Some(previous_height) = request.previous_height { if Some(request.header_hash) != simulator.header_hash_of(previous_height) { - return Ok(RejectCoinState::new(RejectStateReason::Reorg) + return Ok(RejectPuzzleState::new(RejectStateReason::Reorg) .to_bytes()? .into()); } } else if request.header_hash != config.constants.genesis_challenge { - return Ok(RejectCoinState::new(RejectStateReason::Reorg) + return Ok(RejectPuzzleState::new(RejectStateReason::Reorg) .to_bytes()? .into()); } @@ -523,3 +577,35 @@ fn request_puzzle_state( .to_bytes()? .into()) } + +fn request_remove_coin_subscriptions( + peer: SocketAddr, + request: RequestRemoveCoinSubscriptions, + subscriptions: &mut MutexGuard<'_, Subscriptions>, +) -> Result { + let coin_ids = if let Some(coin_ids) = request.coin_ids { + subscriptions.remove_coin_subscriptions(peer, &coin_ids) + } else { + subscriptions.remove_all_coin_subscriptions(peer) + }; + + Ok(RespondRemoveCoinSubscriptions { coin_ids } + .to_bytes()? + .into()) +} + +fn request_remove_puzzle_subscriptions( + peer: SocketAddr, + request: RequestRemovePuzzleSubscriptions, + subscriptions: &mut MutexGuard<'_, Subscriptions>, +) -> Result { + let puzzle_hashes = if let Some(puzzle_hashes) = request.puzzle_hashes { + subscriptions.remove_puzzle_subscriptions(peer, &puzzle_hashes) + } else { + subscriptions.remove_all_puzzle_subscriptions(peer) + }; + + Ok(RespondRemovePuzzleSubscriptions { puzzle_hashes } + .to_bytes()? + .into()) +}