Skip to content

Commit

Permalink
Add on_receipt with preprocessed IncompleteTransaction
Browse files Browse the repository at this point in the history
  • Loading branch information
Sliman4 committed May 3, 2024
1 parent d34d59a commit 10aa0e0
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 32 deletions.
49 changes: 18 additions & 31 deletions src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ use std::fmt::{Debug, Display};
use std::{collections::HashMap, time::Duration};

use near_indexer_primitives::types::BlockHeightDelta;
use near_indexer_primitives::{CryptoHash, IndexerTransactionWithOutcome, StreamerMessage};
use near_indexer_primitives::{CryptoHash, StreamerMessage};

use crate::{BlockProcessingOptions, CompleteTransaction, Indexer, TransactionReceipt};
use crate::{
BlockProcessingOptions, CompleteTransaction, IncompleteTransaction, Indexer, TransactionReceipt,
};

const BLOCK_PROCESSING_WARNING_THRESHOLD: Duration = Duration::from_millis(300);
const PERFORMANCE_REPORT_EVERY_BLOCKS: BlockHeightDelta = 5000;
Expand All @@ -17,28 +19,6 @@ pub(crate) struct IndexerState {
time_spent: Duration,
}

#[derive(Debug)]
struct IncompleteTransaction {
transaction: IndexerTransactionWithOutcome,
receipts: HashMap<CryptoHash, Option<TransactionReceipt>>,
}

impl TryFrom<&IncompleteTransaction> for CompleteTransaction {
type Error = &'static str;

fn try_from(value: &IncompleteTransaction) -> Result<Self, Self::Error> {
let receipts = value
.receipts
.values()
.map(|receipt| receipt.clone().ok_or("Missing receipt"))
.collect::<Result<Vec<_>, _>>()?;
Ok(Self {
transaction: value.transaction.clone(),
receipts,
})
}
}

impl IndexerState {
pub fn new() -> Self {
Self {
Expand Down Expand Up @@ -134,13 +114,20 @@ impl IndexerState {
if let Some(incomplete_transaction) =
self.pending_transactions.get_mut(&tx_id)
{
incomplete_transaction.receipts.insert(
receipt.receipt.receipt_id,
Some(TransactionReceipt {
receipt: receipt.clone(),
block_height: message.block.header.height,
}),
);
let processed_receipt = TransactionReceipt {
receipt: receipt.clone(),
block_height: message.block.header.height,
};

if options.handle_preprocessed_transactions_by_indexer {
indexer
.on_receipt(&processed_receipt, incomplete_transaction, message)
.await?;
}

incomplete_transaction
.receipts
.insert(receipt.receipt.receipt_id, Some(processed_receipt));
for new_receipt_id in
receipt.execution_outcome.outcome.receipt_ids.iter()
{
Expand Down
40 changes: 39 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ pub mod multiindexer;
pub mod near_utils;

use std::{
collections::HashMap,
fmt::Debug,
ops::Range,
path::PathBuf,
Expand All @@ -48,7 +49,7 @@ pub use near_indexer_primitives;
use near_indexer_primitives::{
types::{BlockHeight, BlockHeightDelta},
views::ExecutionStatusView,
IndexerExecutionOutcomeWithReceipt, IndexerTransactionWithOutcome, StreamerMessage,
CryptoHash, IndexerExecutionOutcomeWithReceipt, IndexerTransactionWithOutcome, StreamerMessage,
};
use near_utils::{is_receipt_successful, MAINNET_GENESIS_BLOCK_HEIGHT};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -101,6 +102,15 @@ pub trait Indexer: Send + Sync + 'static {
) -> Result<(), Self::Error> {
Ok(())
}

async fn on_receipt(
&mut self,
_receipt: &TransactionReceipt,
_transaction: &IncompleteTransaction,
_block: &StreamerMessage,
) -> Result<(), Self::Error> {
Ok(())
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
Expand All @@ -120,6 +130,34 @@ impl CompleteTransaction {
}
}

#[derive(Debug)]
pub struct IncompleteTransaction {
pub transaction: IndexerTransactionWithOutcome,
/// Receipts with None are created by a transaction or another receipt, but are not yet available.
///
/// This map does not contain all receipts of a transaction, since this on_receipt is called
/// before the transaction is fully complete, so there's no way to know how many receipts there will be.
///
/// During on_receipt, the receipt you're processing is None in this map.
pub receipts: HashMap<CryptoHash, Option<TransactionReceipt>>,
}

impl TryFrom<&IncompleteTransaction> for CompleteTransaction {
type Error = &'static str;

fn try_from(value: &IncompleteTransaction) -> Result<Self, Self::Error> {
let receipts = value
.receipts
.values()
.map(|receipt| receipt.clone().ok_or("Missing receipt"))
.collect::<Result<Vec<_>, _>>()?;
Ok(Self {
transaction: value.transaction.clone(),
receipts,
})
}
}

#[derive(Debug, Serialize, Deserialize, Clone)]
pub struct TransactionReceipt {
pub receipt: IndexerExecutionOutcomeWithReceipt,
Expand Down

0 comments on commit 10aa0e0

Please sign in to comment.