diff --git a/src/bin/hyle.rs b/src/bin/hyle.rs index f0678e529..31a42cebe 100644 --- a/src/bin/hyle.rs +++ b/src/bin/hyle.rs @@ -152,19 +152,6 @@ async fn main() -> Result<()> { }; let mut handler = ModulesHandler::new(&bus).await; - handler.build_module::(ctx.clone()).await?; - - handler.build_module::(ctx.clone()).await?; - if config.single_node.unwrap_or(false) { - handler - .build_module::(ctx.clone()) - .await?; - } else { - handler.build_module::(ctx.clone()).await?; - } - handler - .build_module::(ctx.clone()) - .await?; if run_indexer { handler.build_module::(ctx.common.clone()).await?; @@ -187,11 +174,27 @@ async fn main() -> Result<()> { }) .await?; } + + handler + .build_module::(ctx.common.clone()) + .await?; + handler .build_module::(ctx.clone()) .await?; + + handler.build_module::(ctx.clone()).await?; + + handler.build_module::(ctx.clone()).await?; + if config.single_node.unwrap_or(false) { + handler + .build_module::(ctx.clone()) + .await?; + } else { + handler.build_module::(ctx.clone()).await?; + } handler - .build_module::(ctx.common.clone()) + .build_module::(ctx.clone()) .await?; handler.build_module::(ctx.clone()).await?; diff --git a/src/bus/command_response.rs b/src/bus/command_response.rs index 3ff57652e..6acf4f356 100644 --- a/src/bus/command_response.rs +++ b/src/bus/command_response.rs @@ -168,10 +168,13 @@ macro_rules! handle_messages { } } } + while let Ok($res) = $index.try_recv() { receive_bus_metrics::<$message, _>(&mut $bus); $handler; }; + + tracing::trace!("Remaining messages in topic {}: {}", stringify!($message), $index.len()); }; // Fallback to else case diff --git a/src/data_availability.rs b/src/data_availability.rs index 098e91f65..7d7ce5c4a 100644 --- a/src/data_availability.rs +++ b/src/data_availability.rs @@ -9,13 +9,12 @@ mod blocks_memory; use blocks_fjall::Blocks; //use blocks_memory::Blocks; -use codec::{codec_data_availability, DataAvailabilityEvent}; +use codec::{codec_data_availability, DataAvailabilityEvent, DataAvailabilityRequest}; use crate::{ bus::{BusClientSender, BusMessage}, consensus::{ConsensusCommand, ConsensusEvent}, genesis::GenesisEvent, - indexer::da_listener::RawDAListener, mempool::{MempoolBlockEvent, MempoolStatusEvent}, model::*, module_handle_messages, @@ -27,7 +26,7 @@ use crate::{ modules::{module_bus_client, Module}, }, }; -use anyhow::{bail, Context, Error, Result}; +use anyhow::{Context, Error, Result}; use borsh::{BorshDeserialize, BorshSerialize}; use core::str; use serde::{Deserialize, Serialize}; @@ -404,12 +403,13 @@ impl DataAvailability { .last() .map(|block| block.height() + 1) .unwrap_or(BlockHeight(0)); - let Ok(mut stream) = RawDAListener::new(&ip, start).await else { - bail!("Error occured setting up the DA listener"); - }; + let mut client = codec_data_availability::connect("block_catcher".to_string(), ip) + .await + .context("Error occured setting up the DA listener")?; + client.send(DataAvailabilityRequest(start)).await?; self.catchup_task = Some(tokio::spawn(async move { loop { - match stream.recv().await { + match client.recv().await { None => { break; } diff --git a/src/indexer/da_listener.rs b/src/indexer/da_listener.rs index 7335ca158..22254b175 100644 --- a/src/indexer/da_listener.rs +++ b/src/indexer/da_listener.rs @@ -1,9 +1,6 @@ -use std::{ - ops::{Deref, DerefMut}, - sync::Arc, -}; +use std::sync::Arc; -use anyhow::{bail, Result}; +use anyhow::Result; use hyle_model::Hashed; use tracing::{debug, info}; @@ -34,24 +31,7 @@ pub struct DAListener { config: SharedConf, bus: DAListenerBusClient, node_state: NodeState, - listener: RawDAListener, -} - -/// Implementation of the bit that actually listens to the data availability stream -pub struct RawDAListener { - client: codec_data_availability::Client, -} - -impl Deref for RawDAListener { - type Target = codec_data_availability::Client; - fn deref(&self) -> &Self::Target { - &self.client - } -} -impl DerefMut for RawDAListener { - fn deref_mut(&mut self) -> &mut Self::Target { - &mut self.client - } + start_block: BlockHeight, } pub struct DAListenerCtx { @@ -73,7 +53,6 @@ impl Module for DAListener { let start_block = ctx.start_block.unwrap_or(node_state.current_height); - let listener = RawDAListener::new(&ctx.common.config.da_address, start_block).await?; let bus = DAListenerBusClient::new_from_bus(ctx.common.bus.new_handle()).await; for name in node_state.contracts.keys() { @@ -82,7 +61,7 @@ impl Module for DAListener { Ok(DAListener { config: ctx.common.config.clone(), - listener, + start_block, bus, node_state, }) @@ -94,14 +73,32 @@ impl Module for DAListener { } impl DAListener { + async fn start_client( + &self, + block_height: BlockHeight, + ) -> Result { + let mut client = codec_data_availability::connect( + "raw_da_listener".to_string(), + self.config.da_address.to_string(), + ) + .await?; + + client.send(DataAvailabilityRequest(block_height)).await?; + + Ok(client) + } pub async fn start(&mut self) -> Result<()> { + let mut client = self.start_client(self.start_block).await?; + module_handle_messages! { on_bus self.bus, - frame = self.listener.recv() => { - let Some(streamed_signed_block) = frame else { - bail!("DA stream closed"); - }; - self.processing_next_frame(streamed_signed_block).await.log_error("Consuming da stream")?; + frame = client.recv() => { + if let Some(streamed_signed_block) = frame { + self.processing_next_frame(streamed_signed_block).await.log_error("Consuming da stream")?; + client.ping().await?; + } else { + client = self.start_client(self.node_state.current_height + 1).await?; + } } }; let _ = Self::save_on_disk::( @@ -129,18 +126,6 @@ impl DAListener { self.bus.send(NodeStateEvent::NewBlock(Box::new(block)))?; } - self.listener.ping().await?; - Ok(()) } } - -impl RawDAListener { - pub async fn new(target: &str, height: BlockHeight) -> Result { - let mut client = - codec_data_availability::connect("raw_da_listener".to_string(), target.to_string()) - .await?; - client.send(DataAvailabilityRequest(height)).await?; - Ok(RawDAListener { client }) - } -} diff --git a/src/mempool.rs b/src/mempool.rs index 5bb770efb..a4ff0f393 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -83,7 +83,6 @@ struct MempoolBusClient { sender(MempoolBlockEvent), sender(MempoolStatusEvent), sender(InternalMempoolEvent), - receiver(InternalMempoolEvent), receiver(SignedByValidator), receiver(RestApiMessage), receiver(TcpServerMessage), @@ -91,6 +90,7 @@ struct MempoolBusClient { receiver(GenesisEvent), receiver(NodeStateEvent), receiver(Query), + receiver(InternalMempoolEvent), } } @@ -374,7 +374,10 @@ impl Mempool { } fn handle_data_proposal_management(&mut self) -> Result<()> { - trace!("🌝 Handling DataProposal management"); + debug!( + "🌝 Handling DataProposal management with {} txs", + self.waiting_dissemination_txs.len() + ); // Create new DataProposal with pending txs let crypto = self.crypto.clone(); let new_txs = std::mem::take(&mut self.waiting_dissemination_txs); @@ -947,6 +950,9 @@ impl Mempool { fn on_new_tx(&mut self, tx: Transaction) -> Result<()> { // TODO: Verify fees ? + let tx_type: &'static str = (&tx.transaction_data).into(); + trace!("Tx {} received in mempool", tx_type); + let tx_hash = tx.hashed(); match tx.transaction_data { @@ -965,13 +971,15 @@ impl Mempool { let kc = self.known_contracts.clone(); let sender: &tokio::sync::broadcast::Sender = self.bus.get(); let sender = sender.clone(); - tokio::task::spawn_blocking(move || { + let t = tokio::task::spawn_blocking(move || { let tx = Self::process_proof_tx(kc, tx).log_error("Error processing proof tx")?; sender .send(InternalMempoolEvent::OnProcessedNewTx(tx)) .log_warn("sending processed TX") }); + while !t.is_finished() {} + return Ok(()); } TransactionData::VerifiedProof(ref proof_tx) => { diff --git a/src/node_state.rs b/src/node_state.rs index 796b05255..2ce07a9d9 100644 --- a/src/node_state.rs +++ b/src/node_state.rs @@ -70,6 +70,16 @@ impl Default for NodeState { impl NodeState { pub fn handle_signed_block(&mut self, signed_block: &SignedBlock) -> Block { + let next_block = self.current_height + 1 != signed_block.height(); + let initial_block = self.current_height.0 == 0 && signed_block.height().0 == 0; + if !next_block && !initial_block { + error!( + "Handling signed block of height {} while current height is {}", + signed_block.height(), + self.current_height + ); + } + self.current_height = signed_block.height(); let mut block_under_construction = Block { diff --git a/tests/consensus_tests.rs b/tests/consensus_tests.rs index c5ec0f3c6..1f7c83feb 100644 --- a/tests/consensus_tests.rs +++ b/tests/consensus_tests.rs @@ -178,8 +178,6 @@ mod e2e_consensus { tracing::warn!("Register TX Hash: {:?}", tx_hash); } - // tokio::time::sleep(Duration::from_millis(500)).await; - { let mut transaction = ProvableBlobTx::new("faucet.hydentity".into()); @@ -201,8 +199,6 @@ mod e2e_consensus { tracing::warn!("Transfer TX Hash: {:?}", tx_hash); } - // tokio::time::sleep(Duration::from_millis(500)).await; - Ok(()) } @@ -283,9 +279,6 @@ mod e2e_consensus { _ = ctx.wait_height(1).await; - let consensus = ctx.client().get_consensus_info().await?; - assert_eq!(consensus.validators.len(), 0, "expected 0 validators"); - // Gen a few txs let mut tx_ctx = init_states(&mut ctx).await; @@ -295,22 +288,13 @@ mod e2e_consensus { _ = gen_txs(&mut ctx, &mut tx_ctx, format!("alex{}", i), 100 + i).await; } - ctx.wait_height(1).await?; - ctx.stop_node(0).await?; ctx.restart_node(0)?; - ctx.wait_height(3).await?; - - // Resync last state - // let mut tx_ctx = init_states(&mut ctx).await; - - let consensus = ctx.client().get_consensus_info().await?; - assert_eq!(consensus.validators.len(), 0, "expected 0 validators"); + ctx.wait_height(1).await?; for i in 6..10 { _ = gen_txs(&mut ctx, &mut tx_ctx, format!("alex{}", i), 100 + i).await; - tokio::time::sleep(Duration::from_millis(100)).await; } ctx.wait_height(1).await?;