diff --git a/crates/hyle-model/src/api.rs b/crates/hyle-model/src/api.rs index 612256f0e..c64b59947 100644 --- a/crates/hyle-model/src/api.rs +++ b/crates/hyle-model/src/api.rs @@ -86,6 +86,7 @@ pub enum TransactionTypeDb { pub enum TransactionStatusDb { WaitingDissemination, DataProposalCreated, + DataProposalPoda, Success, Failure, Sequenced, diff --git a/crates/hyle-model/src/node/mempool.rs b/crates/hyle-model/src/node/mempool.rs index 8578883fd..4dfa90e51 100644 --- a/crates/hyle-model/src/node/mempool.rs +++ b/crates/hyle-model/src/node/mempool.rs @@ -99,6 +99,12 @@ pub struct TxId(pub DataProposalHash, pub TxHash); #[cfg_attr(feature = "full", derive(utoipa::ToSchema))] pub struct DataProposalHash(pub String); +impl DataProposalHash { + pub fn genesis(v_pubkey: ValidatorPublicKey) -> DataProposalHash { + DataProposalHash(hex::encode(v_pubkey.0)) + } +} + impl Hashed for DataProposal { fn hashed(&self) -> DataProposalHash { if let Some(hash) = self.hash_cache.read().unwrap().as_ref() { diff --git a/src/indexer.rs b/src/indexer.rs index 6ecb09495..215898875 100644 --- a/src/indexer.rs +++ b/src/indexer.rs @@ -343,6 +343,45 @@ impl Indexer { .await .context("Upserting data at status data_proposal_created")?; } + + MempoolStatusEvent::DataProposalPoda { + data_proposal_hash: _, + txs_metadatas, + signatures: _, + } => { + let mut query_builder = QueryBuilder::new("INSERT INTO transactions (tx_hash, parent_dp_hash, version, transaction_type, transaction_status)"); + + query_builder.push_values(txs_metadatas, |mut b, value| { + let tx_type: TransactionTypeDb = value.transaction_kind.into(); + let version = i32::try_from(value.version) + .map_err(|_| anyhow::anyhow!("Tx version is too large to fit into an i32")) + .log_error("Converting version number into i32") + .unwrap_or(0); + + let tx_hash: TxHashDb = value.id.1.into(); + let parent_data_proposal_hash_db: DataProposalHashDb = value.id.0.into(); + + b.push_bind(tx_hash) + .push_bind(parent_data_proposal_hash_db) + .push_bind(version) + .push_bind(tx_type) + .push_bind(TransactionStatusDb::DataProposalPoda); + }); + + // If the TX is already present, we try to update its status, only if the status is lower ('waiting_dissemination', 'data_proposal_created'). + query_builder.push(" ON CONFLICT(tx_hash, parent_dp_hash) DO UPDATE SET "); + + query_builder.push("transaction_status="); + query_builder.push_bind(TransactionStatusDb::DataProposalPoda); + query_builder + .push(" WHERE transactions.transaction_status IN ('waiting_dissemination', 'data_proposal_created')"); + + query_builder + .build() + .execute(transaction.deref_mut()) + .await + .context("Upserting data at status data_proposal_poda")?; + } } transaction.commit().await?; @@ -1196,6 +1235,47 @@ mod test { ) .await; + let data_proposal_poda_event = MempoolStatusEvent::DataProposalPoda { + signatures: vec![], + data_proposal_hash: data_proposal.hashed(), + txs_metadatas: vec![ + register_tx_1_wd.metadata(parent_data_proposal_hash.clone()), + register_tx_2_wd.metadata(parent_data_proposal_hash.clone()), + blob_transaction_wd.metadata(parent_data_proposal_hash.clone()), + proof_tx_1_wd.metadata(parent_data_proposal_hash.clone()), + ], + }; + + indexer + .handle_mempool_status_event(data_proposal_poda_event.clone()) + .await + .expect("MempoolStatusEvent"); + + assert_tx_status( + &server, + register_tx_1_wd.hashed(), + TransactionStatusDb::DataProposalPoda, + ) + .await; + assert_tx_status( + &server, + register_tx_2_wd.hashed(), + TransactionStatusDb::DataProposalPoda, + ) + .await; + assert_tx_status( + &server, + blob_transaction_wd.hashed(), + TransactionStatusDb::DataProposalPoda, + ) + .await; + assert_tx_status( + &server, + proof_tx_1_wd.hashed(), + TransactionStatusDb::DataProposalPoda, + ) + .await; + let mut signed_block = SignedBlock::default(); signed_block.consensus_proposal.timestamp = 1234; signed_block.consensus_proposal.slot = 2; diff --git a/src/indexer/migrations/20241009154948_create_tables.sql b/src/indexer/migrations/20241009154948_create_tables.sql index d1f4f9020..31b52ab5e 100644 --- a/src/indexer/migrations/20241009154948_create_tables.sql +++ b/src/indexer/migrations/20241009154948_create_tables.sql @@ -10,7 +10,7 @@ CREATE TABLE blocks ( ); CREATE TYPE transaction_type AS ENUM ('blob_transaction', 'proof_transaction', 'stake'); -CREATE TYPE transaction_status AS ENUM ('data_proposal_created','waiting_dissemination','success', 'failure', 'sequenced', 'timed_out'); +CREATE TYPE transaction_status AS ENUM ('data_proposal_poda', 'data_proposal_created','waiting_dissemination','success', 'failure', 'sequenced', 'timed_out'); CREATE TABLE transactions ( parent_dp_hash TEXT NOT NULL, -- Data Proposal hash diff --git a/src/mempool.rs b/src/mempool.rs index 5bb770efb..4b293d0ab 100644 --- a/src/mempool.rs +++ b/src/mempool.rs @@ -178,6 +178,11 @@ pub enum MempoolStatusEvent { data_proposal_hash: DataProposalHash, txs_metadatas: Vec, }, + DataProposalPoda { + data_proposal_hash: DataProposalHash, + txs_metadatas: Vec, + signatures: Vec, + }, } impl BusMessage for MempoolStatusEvent {} @@ -827,6 +832,14 @@ impl Mempool { || old_voting_power < 2 * f && new_voting_power >= 2 * f || new_voting_power == 3 * f + 1 { + let txs_metadatas = self.lanes.tx_metadatas(&data_proposal_hash)?; + + self.bus.send(MempoolStatusEvent::DataProposalPoda { + data_proposal_hash: data_proposal_hash.clone(), + txs_metadatas, + signatures: validators.clone(), + })?; + self.broadcast_net_message(MempoolNetMessage::PoDAUpdate( data_proposal_hash, signatures, @@ -1486,6 +1499,30 @@ pub mod test { let pub_key = self.validator_pubkey().clone(); self.pop_validator_data_proposal(&pub_key) } + pub fn peek_data_proposal(&mut self) -> (DataProposal, DataProposalHash, LaneBytesSize) { + let pub_key = self.validator_pubkey().clone(); + self.peek_validator_data_proposal(&pub_key) + } + + pub fn peek_validator_data_proposal( + &mut self, + validator: &ValidatorPublicKey, + ) -> (DataProposal, DataProposalHash, LaneBytesSize) { + // Get the latest lane entry + let latest_data_proposal_hash = self.current_hash(validator).unwrap(); + let latest_lane_entry = self + .mempool + .lanes + .get_by_hash(validator, &latest_data_proposal_hash) + .unwrap() + .unwrap(); + + ( + latest_lane_entry.data_proposal.clone(), + latest_data_proposal_hash, + latest_lane_entry.cumul_size, + ) + } pub fn pop_validator_data_proposal( &mut self, diff --git a/src/mempool/storage.rs b/src/mempool/storage.rs index 8645a0dd3..8d5eab4f9 100644 --- a/src/mempool/storage.rs +++ b/src/mempool/storage.rs @@ -102,6 +102,31 @@ pub trait Storage { Ok(cut) } + fn tx_metadatas( + &mut self, + data_proposal_hash: &DataProposalHash, + ) -> Result> { + let id = self.id().clone(); + match self.get_by_hash(&id, data_proposal_hash)? { + Some(lane_entry) => { + let parent_data_proposal_hash = lane_entry + .data_proposal + .parent_data_proposal_hash + .clone() + .unwrap_or(DataProposalHash::genesis(id.clone())); + let txs_metadatas: Vec = lane_entry + .data_proposal + .txs + .iter() + .map(|tx| tx.metadata(parent_data_proposal_hash.clone())) + .collect(); + Ok(txs_metadatas) + } + None => { + bail!("Could not find lane entry for {id} ({data_proposal_hash})"); + } + } + } // Called by the initial proposal validator to aggregate votes fn on_data_vote( &mut self, @@ -520,7 +545,7 @@ pub trait Storage { let parent_data_proposal = data_proposal .parent_data_proposal_hash .clone() - .unwrap_or_default(); + .unwrap_or_else(|| DataProposalHash(hex::encode(validator_key.0.clone()))); let tx_metadatas = data_proposal .txs diff --git a/src/tests/autobahn_testing.rs b/src/tests/autobahn_testing.rs index d7948b77e..0fa17c12b 100644 --- a/src/tests/autobahn_testing.rs +++ b/src/tests/autobahn_testing.rs @@ -339,11 +339,42 @@ async fn autobahn_basic_flow() { node1.mempool_ctx.submit_tx(®ister_tx); node1.mempool_ctx.submit_tx(®ister_tx_2); + assert_chanmsg_matches!( + node1.mempool_ctx.mempool_status_event_receiver, + MempoolStatusEvent::WaitingDissemination { parent_data_proposal_hash, tx } => { + assert_eq!(parent_data_proposal_hash, DataProposalHash(node1.mempool_ctx.validator_pubkey().to_string())); + assert_eq!(tx, register_tx); + } + ); + + assert_chanmsg_matches!( + node1.mempool_ctx.mempool_status_event_receiver, + MempoolStatusEvent::WaitingDissemination { parent_data_proposal_hash, tx } => { + assert_eq!(parent_data_proposal_hash, DataProposalHash(node1.mempool_ctx.validator_pubkey().to_string())); + assert_eq!(tx, register_tx_2); + } + ); + node1 .mempool_ctx .make_data_proposal_with_pending_txs() .expect("Should create data proposal"); + assert_chanmsg_matches!( + node1.mempool_ctx.mempool_status_event_receiver, + MempoolStatusEvent::DataProposalCreated { data_proposal_hash, txs_metadatas } => { + + // First txs refer to a genesis data proposal that has a hash corresponding to validator pubkey + let genesis_dp_hash = hex::encode(node1.mempool_ctx.validator_pubkey().clone().0); + for t in txs_metadatas.iter() { + assert_eq!(genesis_dp_hash, t.id.0.0); + } + + assert_eq!(data_proposal_hash, node1.mempool_ctx.peek_data_proposal().0.hashed()); + assert_eq!(txs_metadatas.len(), node1.mempool_ctx.peek_data_proposal().0.txs.len()); + } + ); + broadcast! { description: "Disseminate Tx", from: node1.mempool_ctx, to: [node2.mempool_ctx, node3.mempool_ctx, node4.mempool_ctx], @@ -369,6 +400,42 @@ async fn autobahn_basic_flow() { message_matches: MempoolNetMessage::DataVote(_, _) }; + assert_chanmsg_matches!( + node1.mempool_ctx.mempool_status_event_receiver, + MempoolStatusEvent::DataProposalPoda { data_proposal_hash, txs_metadatas, signatures } => { + + // First txs refer to a genesis data proposal that has a hash corresponding to validator pubkey + let genesis_dp_hash = hex::encode(node1.mempool_ctx.validator_pubkey().clone().0); + for t in txs_metadatas.iter() { + assert_eq!(genesis_dp_hash, t.id.0.0); + } + + assert!(signatures.contains(node1.mempool_ctx.validator_pubkey())); + assert!(signatures.contains(node2.mempool_ctx.validator_pubkey())); + + assert_eq!(data_proposal_hash, node1.mempool_ctx.peek_data_proposal().0.hashed()); + assert_eq!(txs_metadatas.len(), node1.mempool_ctx.peek_data_proposal().0.txs.len()); + } + ); + assert_chanmsg_matches!( + node1.mempool_ctx.mempool_status_event_receiver, + MempoolStatusEvent::DataProposalPoda { data_proposal_hash, txs_metadatas, signatures } => { + + // First txs refer to a genesis data proposal that has a hash corresponding to validator pubkey + let genesis_dp_hash = hex::encode(node1.mempool_ctx.validator_pubkey().clone().0); + for t in txs_metadatas.iter() { + assert_eq!(genesis_dp_hash, t.id.0.0); + } + + assert!(signatures.contains(node1.mempool_ctx.validator_pubkey())); + assert!(signatures.contains(node2.mempool_ctx.validator_pubkey())); + assert!(signatures.contains(node3.mempool_ctx.validator_pubkey())); + + assert_eq!(data_proposal_hash, node1.mempool_ctx.peek_data_proposal().0.hashed()); + assert_eq!(txs_metadatas.len(), node1.mempool_ctx.peek_data_proposal().0.txs.len()); + } + ); + let data_proposal_hash_node1 = node1 .mempool_ctx .current_hash(node1.mempool_ctx.validator_pubkey())