Skip to content

Commit

Permalink
fix async proofs + indexer client
Browse files Browse the repository at this point in the history
  • Loading branch information
hhalex committed Feb 24, 2025
1 parent bbd3e38 commit dba194d
Show file tree
Hide file tree
Showing 7 changed files with 76 additions and 83 deletions.
31 changes: 17 additions & 14 deletions src/bin/hyle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,19 +152,6 @@ async fn main() -> Result<()> {
};

let mut handler = ModulesHandler::new(&bus).await;
handler.build_module::<Mempool>(ctx.clone()).await?;

handler.build_module::<Genesis>(ctx.clone()).await?;
if config.single_node.unwrap_or(false) {
handler
.build_module::<SingleNodeConsensus>(ctx.clone())
.await?;
} else {
handler.build_module::<Consensus>(ctx.clone()).await?;
}
handler
.build_module::<MockWorkflowHandler>(ctx.clone())
.await?;

if run_indexer {
handler.build_module::<Indexer>(ctx.common.clone()).await?;
Expand All @@ -187,11 +174,27 @@ async fn main() -> Result<()> {
})
.await?;
}

handler
.build_module::<NodeStateModule>(ctx.common.clone())
.await?;

handler
.build_module::<DataAvailability>(ctx.clone())
.await?;

handler.build_module::<Mempool>(ctx.clone()).await?;

handler.build_module::<Genesis>(ctx.clone()).await?;
if config.single_node.unwrap_or(false) {
handler
.build_module::<SingleNodeConsensus>(ctx.clone())
.await?;
} else {
handler.build_module::<Consensus>(ctx.clone()).await?;
}
handler
.build_module::<NodeStateModule>(ctx.common.clone())
.build_module::<MockWorkflowHandler>(ctx.clone())
.await?;

handler.build_module::<P2P>(ctx.clone()).await?;
Expand Down
3 changes: 3 additions & 0 deletions src/bus/command_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,10 +168,13 @@ macro_rules! handle_messages {
}
}
}

while let Ok($res) = $index.try_recv() {
receive_bus_metrics::<$message, _>(&mut $bus);
$handler;
};

tracing::trace!("Remaining messages in topic {}: {}", stringify!($message), $index.len());
};

// Fallback to else case
Expand Down
14 changes: 7 additions & 7 deletions src/data_availability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,12 @@ mod blocks_memory;
use blocks_fjall::Blocks;
//use blocks_memory::Blocks;

use codec::{codec_data_availability, DataAvailabilityEvent};
use codec::{codec_data_availability, DataAvailabilityEvent, DataAvailabilityRequest};

use crate::{
bus::{BusClientSender, BusMessage},
consensus::{ConsensusCommand, ConsensusEvent},
genesis::GenesisEvent,
indexer::da_listener::RawDAListener,
mempool::{MempoolBlockEvent, MempoolStatusEvent},
model::*,
module_handle_messages,
Expand All @@ -27,7 +26,7 @@ use crate::{
modules::{module_bus_client, Module},
},
};
use anyhow::{bail, Context, Error, Result};
use anyhow::{Context, Error, Result};
use borsh::{BorshDeserialize, BorshSerialize};
use core::str;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -404,12 +403,13 @@ impl DataAvailability {
.last()
.map(|block| block.height() + 1)
.unwrap_or(BlockHeight(0));
let Ok(mut stream) = RawDAListener::new(&ip, start).await else {
bail!("Error occured setting up the DA listener");
};
let mut client = codec_data_availability::connect("block_catcher".to_string(), ip)
.await
.context("Error occured setting up the DA listener")?;
client.send(DataAvailabilityRequest(start)).await?;
self.catchup_task = Some(tokio::spawn(async move {
loop {
match stream.recv().await {
match client.recv().await {
None => {
break;
}
Expand Down
69 changes: 27 additions & 42 deletions src/indexer/da_listener.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
use std::{
ops::{Deref, DerefMut},
sync::Arc,
};
use std::sync::Arc;

use anyhow::{bail, Result};
use anyhow::Result;
use hyle_model::Hashed;
use tracing::{debug, info};

Expand Down Expand Up @@ -34,24 +31,7 @@ pub struct DAListener {
config: SharedConf,
bus: DAListenerBusClient,
node_state: NodeState,
listener: RawDAListener,
}

/// Implementation of the bit that actually listens to the data availability stream
pub struct RawDAListener {
client: codec_data_availability::Client,
}

impl Deref for RawDAListener {
type Target = codec_data_availability::Client;
fn deref(&self) -> &Self::Target {
&self.client
}
}
impl DerefMut for RawDAListener {
fn deref_mut(&mut self) -> &mut Self::Target {
&mut self.client
}
start_block: BlockHeight,
}

pub struct DAListenerCtx {
Expand All @@ -73,7 +53,6 @@ impl Module for DAListener {

let start_block = ctx.start_block.unwrap_or(node_state.current_height);

let listener = RawDAListener::new(&ctx.common.config.da_address, start_block).await?;
let bus = DAListenerBusClient::new_from_bus(ctx.common.bus.new_handle()).await;

for name in node_state.contracts.keys() {
Expand All @@ -82,7 +61,7 @@ impl Module for DAListener {

Ok(DAListener {
config: ctx.common.config.clone(),
listener,
start_block,
bus,
node_state,
})
Expand All @@ -94,14 +73,32 @@ impl Module for DAListener {
}

impl DAListener {
async fn start_client(
&self,
block_height: BlockHeight,
) -> Result<codec_data_availability::Client> {
let mut client = codec_data_availability::connect(
"raw_da_listener".to_string(),
self.config.da_address.to_string(),
)
.await?;

client.send(DataAvailabilityRequest(block_height)).await?;

Ok(client)
}
pub async fn start(&mut self) -> Result<()> {
let mut client = self.start_client(self.start_block).await?;

module_handle_messages! {
on_bus self.bus,
frame = self.listener.recv() => {
let Some(streamed_signed_block) = frame else {
bail!("DA stream closed");
};
self.processing_next_frame(streamed_signed_block).await.log_error("Consuming da stream")?;
frame = client.recv() => {
if let Some(streamed_signed_block) = frame {
self.processing_next_frame(streamed_signed_block).await.log_error("Consuming da stream")?;
client.ping().await?;
} else {
client = self.start_client(self.node_state.current_height + 1).await?;
}
}
};
let _ = Self::save_on_disk::<NodeState>(
Expand Down Expand Up @@ -129,18 +126,6 @@ impl DAListener {
self.bus.send(NodeStateEvent::NewBlock(Box::new(block)))?;
}

self.listener.ping().await?;

Ok(())
}
}

impl RawDAListener {
pub async fn new(target: &str, height: BlockHeight) -> Result<Self> {
let mut client =
codec_data_availability::connect("raw_da_listener".to_string(), target.to_string())
.await?;
client.send(DataAvailabilityRequest(height)).await?;
Ok(RawDAListener { client })
}
}
14 changes: 11 additions & 3 deletions src/mempool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,14 +83,14 @@ struct MempoolBusClient {
sender(MempoolBlockEvent),
sender(MempoolStatusEvent),
sender(InternalMempoolEvent),
receiver(InternalMempoolEvent),
receiver(SignedByValidator<MempoolNetMessage>),
receiver(RestApiMessage),
receiver(TcpServerMessage),
receiver(ConsensusEvent),
receiver(GenesisEvent),
receiver(NodeStateEvent),
receiver(Query<QueryNewCut, Cut>),
receiver(InternalMempoolEvent),
}
}

Expand Down Expand Up @@ -374,7 +374,10 @@ impl Mempool {
}

fn handle_data_proposal_management(&mut self) -> Result<()> {
trace!("🌝 Handling DataProposal management");
debug!(
"🌝 Handling DataProposal management with {} txs",
self.waiting_dissemination_txs.len()
);
// Create new DataProposal with pending txs
let crypto = self.crypto.clone();
let new_txs = std::mem::take(&mut self.waiting_dissemination_txs);
Expand Down Expand Up @@ -947,6 +950,9 @@ impl Mempool {
fn on_new_tx(&mut self, tx: Transaction) -> Result<()> {
// TODO: Verify fees ?

let tx_type: &'static str = (&tx.transaction_data).into();
trace!("Tx {} received in mempool", tx_type);

let tx_hash = tx.hashed();

match tx.transaction_data {
Expand All @@ -965,13 +971,15 @@ impl Mempool {
let kc = self.known_contracts.clone();
let sender: &tokio::sync::broadcast::Sender<InternalMempoolEvent> = self.bus.get();
let sender = sender.clone();
tokio::task::spawn_blocking(move || {
let t = tokio::task::spawn_blocking(move || {
let tx =
Self::process_proof_tx(kc, tx).log_error("Error processing proof tx")?;
sender
.send(InternalMempoolEvent::OnProcessedNewTx(tx))
.log_warn("sending processed TX")
});
while !t.is_finished() {}

return Ok(());
}
TransactionData::VerifiedProof(ref proof_tx) => {
Expand Down
10 changes: 10 additions & 0 deletions src/node_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,16 @@ impl Default for NodeState {

impl NodeState {
pub fn handle_signed_block(&mut self, signed_block: &SignedBlock) -> Block {
let next_block = self.current_height + 1 != signed_block.height();
let initial_block = self.current_height.0 == 0 && signed_block.height().0 == 0;
if !next_block && !initial_block {
error!(
"Handling signed block of height {} while current height is {}",
signed_block.height(),
self.current_height
);
}

self.current_height = signed_block.height();

let mut block_under_construction = Block {
Expand Down
18 changes: 1 addition & 17 deletions tests/consensus_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,6 @@ mod e2e_consensus {
tracing::warn!("Register TX Hash: {:?}", tx_hash);
}

// tokio::time::sleep(Duration::from_millis(500)).await;

{
let mut transaction = ProvableBlobTx::new("faucet.hydentity".into());

Expand All @@ -201,8 +199,6 @@ mod e2e_consensus {
tracing::warn!("Transfer TX Hash: {:?}", tx_hash);
}

// tokio::time::sleep(Duration::from_millis(500)).await;

Ok(())
}

Expand Down Expand Up @@ -283,9 +279,6 @@ mod e2e_consensus {

_ = ctx.wait_height(1).await;

let consensus = ctx.client().get_consensus_info().await?;
assert_eq!(consensus.validators.len(), 0, "expected 0 validators");

// Gen a few txs
let mut tx_ctx = init_states(&mut ctx).await;

Expand All @@ -295,22 +288,13 @@ mod e2e_consensus {
_ = gen_txs(&mut ctx, &mut tx_ctx, format!("alex{}", i), 100 + i).await;
}

ctx.wait_height(1).await?;

ctx.stop_node(0).await?;
ctx.restart_node(0)?;

ctx.wait_height(3).await?;

// Resync last state
// let mut tx_ctx = init_states(&mut ctx).await;

let consensus = ctx.client().get_consensus_info().await?;
assert_eq!(consensus.validators.len(), 0, "expected 0 validators");
ctx.wait_height(1).await?;

for i in 6..10 {
_ = gen_txs(&mut ctx, &mut tx_ctx, format!("alex{}", i), 100 + i).await;
tokio::time::sleep(Duration::from_millis(100)).await;
}

ctx.wait_height(1).await?;
Expand Down

0 comments on commit dba194d

Please sign in to comment.