From 10aa0e0de7e6bdd7ae7f8bca1935f31cc85ca38a Mon Sep 17 00:00:00 2001 From: sliman4 <4sliman4@gmail.com> Date: Sat, 4 May 2024 00:04:36 +0300 Subject: [PATCH] Add on_receipt with preprocessed IncompleteTransaction --- src/indexer_state.rs | 49 ++++++++++++++++---------------------------- src/lib.rs | 40 +++++++++++++++++++++++++++++++++++- 2 files changed, 57 insertions(+), 32 deletions(-) diff --git a/src/indexer_state.rs b/src/indexer_state.rs index fdb0b70..16923a5 100644 --- a/src/indexer_state.rs +++ b/src/indexer_state.rs @@ -2,9 +2,11 @@ use std::fmt::{Debug, Display}; use std::{collections::HashMap, time::Duration}; use near_indexer_primitives::types::BlockHeightDelta; -use near_indexer_primitives::{CryptoHash, IndexerTransactionWithOutcome, StreamerMessage}; +use near_indexer_primitives::{CryptoHash, StreamerMessage}; -use crate::{BlockProcessingOptions, CompleteTransaction, Indexer, TransactionReceipt}; +use crate::{ + BlockProcessingOptions, CompleteTransaction, IncompleteTransaction, Indexer, TransactionReceipt, +}; const BLOCK_PROCESSING_WARNING_THRESHOLD: Duration = Duration::from_millis(300); const PERFORMANCE_REPORT_EVERY_BLOCKS: BlockHeightDelta = 5000; @@ -17,28 +19,6 @@ pub(crate) struct IndexerState { time_spent: Duration, } -#[derive(Debug)] -struct IncompleteTransaction { - transaction: IndexerTransactionWithOutcome, - receipts: HashMap>, -} - -impl TryFrom<&IncompleteTransaction> for CompleteTransaction { - type Error = &'static str; - - fn try_from(value: &IncompleteTransaction) -> Result { - let receipts = value - .receipts - .values() - .map(|receipt| receipt.clone().ok_or("Missing receipt")) - .collect::, _>>()?; - Ok(Self { - transaction: value.transaction.clone(), - receipts, - }) - } -} - impl IndexerState { pub fn new() -> Self { Self { @@ -134,13 +114,20 @@ impl IndexerState { if let Some(incomplete_transaction) = self.pending_transactions.get_mut(&tx_id) { - incomplete_transaction.receipts.insert( - receipt.receipt.receipt_id, - Some(TransactionReceipt { - receipt: receipt.clone(), - block_height: message.block.header.height, - }), - ); + let processed_receipt = TransactionReceipt { + receipt: receipt.clone(), + block_height: message.block.header.height, + }; + + if options.handle_preprocessed_transactions_by_indexer { + indexer + .on_receipt(&processed_receipt, incomplete_transaction, message) + .await?; + } + + incomplete_transaction + .receipts + .insert(receipt.receipt.receipt_id, Some(processed_receipt)); for new_receipt_id in receipt.execution_outcome.outcome.receipt_ids.iter() { diff --git a/src/lib.rs b/src/lib.rs index 320a5ee..0943865 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -33,6 +33,7 @@ pub mod multiindexer; pub mod near_utils; use std::{ + collections::HashMap, fmt::Debug, ops::Range, path::PathBuf, @@ -48,7 +49,7 @@ pub use near_indexer_primitives; use near_indexer_primitives::{ types::{BlockHeight, BlockHeightDelta}, views::ExecutionStatusView, - IndexerExecutionOutcomeWithReceipt, IndexerTransactionWithOutcome, StreamerMessage, + CryptoHash, IndexerExecutionOutcomeWithReceipt, IndexerTransactionWithOutcome, StreamerMessage, }; use near_utils::{is_receipt_successful, MAINNET_GENESIS_BLOCK_HEIGHT}; use serde::{Deserialize, Serialize}; @@ -101,6 +102,15 @@ pub trait Indexer: Send + Sync + 'static { ) -> Result<(), Self::Error> { Ok(()) } + + async fn on_receipt( + &mut self, + _receipt: &TransactionReceipt, + _transaction: &IncompleteTransaction, + _block: &StreamerMessage, + ) -> Result<(), Self::Error> { + Ok(()) + } } #[derive(Debug, Serialize, Deserialize, Clone)] @@ -120,6 +130,34 @@ impl CompleteTransaction { } } +#[derive(Debug)] +pub struct IncompleteTransaction { + pub transaction: IndexerTransactionWithOutcome, + /// Receipts with None are created by a transaction or another receipt, but are not yet available. + /// + /// This map does not contain all receipts of a transaction, since this on_receipt is called + /// before the transaction is fully complete, so there's no way to know how many receipts there will be. + /// + /// During on_receipt, the receipt you're processing is None in this map. + pub receipts: HashMap>, +} + +impl TryFrom<&IncompleteTransaction> for CompleteTransaction { + type Error = &'static str; + + fn try_from(value: &IncompleteTransaction) -> Result { + let receipts = value + .receipts + .values() + .map(|receipt| receipt.clone().ok_or("Missing receipt")) + .collect::, _>>()?; + Ok(Self { + transaction: value.transaction.clone(), + receipts, + }) + } +} + #[derive(Debug, Serialize, Deserialize, Clone)] pub struct TransactionReceipt { pub receipt: IndexerExecutionOutcomeWithReceipt,