From 3a52bff838fc2d219510d424dd57809c9942197b Mon Sep 17 00:00:00 2001 From: sliman4 <4sliman4@gmail.com> Date: Wed, 1 May 2024 15:22:45 +0300 Subject: [PATCH] Add AutoContinueEnd --- examples/download.rs | 2 ++ examples/ft_transfers.rs | 1 + src/{tests.rs => indexer_tests.rs} | 28 ++++++++++------------- src/indexer_utils.rs | 7 ++++-- src/lib.rs | 36 +++++++++++++++++++++++++----- 5 files changed, 50 insertions(+), 24 deletions(-) rename src/{tests.rs => indexer_tests.rs} (94%) diff --git a/examples/download.rs b/examples/download.rs index 248c3c6..f3d97d1 100644 --- a/examples/download.rs +++ b/examples/download.rs @@ -103,6 +103,7 @@ async fn main() -> Result<(), Box> { start_height_if_does_not_exist: start_block_height, save_location: Box::new(path.join("last_block.txt")), ctrl_c_handler: true, + end: inindexer::AutoContinueEnd::Infinite, }) } (StartBlockHeight::Genesis, EndBlockHeight::BlockHeight(end_block_height)) => { @@ -116,6 +117,7 @@ async fn main() -> Result<(), Box> { start_height_if_does_not_exist: start_block_height, save_location: Box::new(path.join("last_block.txt")), ctrl_c_handler: true, + end: inindexer::AutoContinueEnd::Infinite, }) } ( diff --git a/examples/ft_transfers.rs b/examples/ft_transfers.rs index 46188c5..1ccbefa 100644 --- a/examples/ft_transfers.rs +++ b/examples/ft_transfers.rs @@ -74,6 +74,7 @@ async fn main() -> Result<(), Box> { save_location: Box::new(PathBuf::from("example_ft_trasnfers_last_block.txt")), start_height_if_does_not_exist: 114_625_946, ctrl_c_handler: true, + end: inindexer::AutoContinueEnd::Infinite, }), stop_on_error: false, preprocess_transactions: None, diff --git a/src/tests.rs b/src/indexer_tests.rs similarity index 94% rename from src/tests.rs rename to src/indexer_tests.rs index 222f500..81c222b 100644 --- a/src/tests.rs +++ b/src/indexer_tests.rs @@ -7,8 +7,8 @@ use std::{collections::HashMap, ops::Range, path::PathBuf}; use crate::lake::LakeStreamer; use crate::{ fastnear_data_server::FastNearDataServerProvider, indexer_utils::MAINNET_GENESIS_BLOCK_HEIGHT, - message_provider::ParallelProviderStreamer, AutoContinue, BlockIterator, CompleteTransaction, - IndexerOptions, PreprocessTransactionsSettings, + message_provider::ParallelProviderStreamer, AutoContinue, AutoContinueEnd, BlockIterator, + CompleteTransaction, IndexerOptions, PreprocessTransactionsSettings, }; use async_trait::async_trait; use near_indexer_primitives::{ @@ -159,45 +159,41 @@ async fn auto_continue() { let save_file = temp_file::with_contents(MAINNET_GENESIS_BLOCK_HEIGHT.to_string().as_bytes()); let save_path = save_file.path(); - let indexer_task = run_indexer( + run_indexer( &mut indexer, FastNearDataServerProvider::mainnet(), IndexerOptions { range: BlockIterator::AutoContinue(AutoContinue { save_location: Box::new(PathBuf::from(save_path)), + end: AutoContinueEnd::Count(5), ..Default::default() }), preprocess_transactions: None, ..Default::default() }, - ); - let timer_task = tokio::time::sleep(std::time::Duration::from_secs(2)); - tokio::select! { - _ = indexer_task => {}, - _ = timer_task => {}, - } + ) + .await + .unwrap(); assert!(indexer.last_block_height > MAINNET_GENESIS_BLOCK_HEIGHT); let current_height = indexer.last_block_height; - let indexer_task = run_indexer( + run_indexer( &mut indexer, FastNearDataServerProvider::mainnet(), IndexerOptions { range: BlockIterator::AutoContinue(AutoContinue { save_location: Box::new(PathBuf::from(save_path)), + end: AutoContinueEnd::Count(5), ..Default::default() }), preprocess_transactions: None, ..Default::default() }, - ); - let timer_task = tokio::time::sleep(std::time::Duration::from_secs(2)); - tokio::select! { - _ = indexer_task => {}, - _ = timer_task => {}, - } + ) + .await + .unwrap(); assert!(indexer.last_block_height > current_height); } diff --git a/src/indexer_utils.rs b/src/indexer_utils.rs index 2cedca4..9b18d70 100644 --- a/src/indexer_utils.rs +++ b/src/indexer_utils.rs @@ -514,10 +514,13 @@ fn test_dec_format_vec() { } let test_vec = TestVec { - vec: vec![1_000_000_000_000_000_000_000, 2_000_000_000_000_000_000_000] + vec: vec![1_000_000_000_000_000_000_000, 2_000_000_000_000_000_000_000], }; let serialized = serde_json::to_string(&test_vec).unwrap(); - assert_eq!(serialized, r#"["1000000000000000000000","2000000000000000000000"]"#); + assert_eq!( + serialized, + r#"["1000000000000000000000","2000000000000000000000"]"# + ); let deserialized: TestVec = serde_json::from_str(&serialized).unwrap(); assert_eq!(deserialized, test_vec); } diff --git a/src/lib.rs b/src/lib.rs index a21d745..4750495 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -23,16 +23,17 @@ #[cfg(feature = "fastnear-data-server")] pub mod fastnear_data_server; mod indexer_state; +#[cfg(test)] +mod indexer_tests; pub mod indexer_utils; #[cfg(feature = "lake")] pub mod lake; #[cfg(feature = "message-provider")] pub mod message_provider; -#[cfg(test)] -mod tests; use std::{ fmt::{Debug, Display}, + ops::Range, path::PathBuf, sync::{ atomic::{AtomicBool, AtomicU64, Ordering}, @@ -144,13 +145,13 @@ pub async fn run_indexer< match options.range { BlockIterator::Iterator(range) => (range, None, None), BlockIterator::AutoContinue(auto_continue) if auto_continue.ctrl_c_handler => ( - Box::new(auto_continue.get_start_block().await..) + Box::new(auto_continue.range().await) as Box + Send>, Some(mpsc::channel::<()>(1)), Some(Box::new(auto_continue)), ), BlockIterator::AutoContinue(auto_continue) => ( - Box::new(auto_continue.get_start_block().await..) + Box::new(auto_continue.range().await) as Box + Send>, None, Some(Box::new(auto_continue)), @@ -353,9 +354,9 @@ impl Default for IndexerOptions { fn default() -> Self { Self { stop_on_error: false, - range: BlockIterator::Iterator(Box::new(std::iter::once_with(|| { + range: BlockIterator::iterator(std::iter::once_with(|| { panic!("Range is not set in IndexerOptions") - }))), + })), preprocess_transactions: Some(PreprocessTransactionsSettings::default()), genesis_block_height: MAINNET_GENESIS_BLOCK_HEIGHT, } @@ -372,6 +373,18 @@ pub struct AutoContinue { /// If true, the indexer will gracefully stop on Ctrl+C signal, avoiding double processing of transactions. /// Transactions that have started, but not finished processing, will be processed again on the next run. pub ctrl_c_handler: bool, + /// If set, the indexer will stop processing blocks after this height. If None, the indexer will process + /// blocks infinitely. + pub end: AutoContinueEnd, +} + +pub enum AutoContinueEnd { + /// The indexer will stop processing blocks after this height. + Height(BlockHeight), + /// The indexer will process this many blocks and then stop. + Count(BlockHeightDelta), + /// The indexer will process blocks infinitely. + Infinite, } impl AutoContinue { @@ -381,6 +394,16 @@ impl AutoContinue { .await .unwrap_or(self.start_height_if_does_not_exist) } + + pub async fn range(&self) -> Range { + let start = self.get_start_block().await; + let end = match self.end { + AutoContinueEnd::Height(height) => height, + AutoContinueEnd::Count(count) => start + count, + AutoContinueEnd::Infinite => BlockHeight::MAX, + }; + start..end + } } impl Default for AutoContinue { @@ -389,6 +412,7 @@ impl Default for AutoContinue { save_location: Box::new(PathBuf::from("last-processed-block.txt")), start_height_if_does_not_exist: MAINNET_GENESIS_BLOCK_HEIGHT, ctrl_c_handler: true, + end: AutoContinueEnd::Infinite, } } }