Skip to content

Commit

Permalink
Add AutoContinueEnd
Browse files Browse the repository at this point in the history
  • Loading branch information
Sliman4 committed May 1, 2024
1 parent 729b656 commit 3a52bff
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 24 deletions.
2 changes: 2 additions & 0 deletions examples/download.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_height_if_does_not_exist: start_block_height,
save_location: Box::new(path.join("last_block.txt")),
ctrl_c_handler: true,
end: inindexer::AutoContinueEnd::Infinite,
})
}
(StartBlockHeight::Genesis, EndBlockHeight::BlockHeight(end_block_height)) => {
Expand All @@ -116,6 +117,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
start_height_if_does_not_exist: start_block_height,
save_location: Box::new(path.join("last_block.txt")),
ctrl_c_handler: true,
end: inindexer::AutoContinueEnd::Infinite,
})
}
(
Expand Down
1 change: 1 addition & 0 deletions examples/ft_transfers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
save_location: Box::new(PathBuf::from("example_ft_trasnfers_last_block.txt")),
start_height_if_does_not_exist: 114_625_946,
ctrl_c_handler: true,
end: inindexer::AutoContinueEnd::Infinite,
}),
stop_on_error: false,
preprocess_transactions: None,
Expand Down
28 changes: 12 additions & 16 deletions src/tests.rs → src/indexer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use std::{collections::HashMap, ops::Range, path::PathBuf};
use crate::lake::LakeStreamer;
use crate::{
fastnear_data_server::FastNearDataServerProvider, indexer_utils::MAINNET_GENESIS_BLOCK_HEIGHT,
message_provider::ParallelProviderStreamer, AutoContinue, BlockIterator, CompleteTransaction,
IndexerOptions, PreprocessTransactionsSettings,
message_provider::ParallelProviderStreamer, AutoContinue, AutoContinueEnd, BlockIterator,
CompleteTransaction, IndexerOptions, PreprocessTransactionsSettings,
};
use async_trait::async_trait;
use near_indexer_primitives::{
Expand Down Expand Up @@ -159,45 +159,41 @@ async fn auto_continue() {
let save_file = temp_file::with_contents(MAINNET_GENESIS_BLOCK_HEIGHT.to_string().as_bytes());
let save_path = save_file.path();

let indexer_task = run_indexer(
run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::AutoContinue(AutoContinue {
save_location: Box::new(PathBuf::from(save_path)),
end: AutoContinueEnd::Count(5),
..Default::default()
}),
preprocess_transactions: None,
..Default::default()
},
);
let timer_task = tokio::time::sleep(std::time::Duration::from_secs(2));
tokio::select! {
_ = indexer_task => {},
_ = timer_task => {},
}
)
.await
.unwrap();

assert!(indexer.last_block_height > MAINNET_GENESIS_BLOCK_HEIGHT);

let current_height = indexer.last_block_height;

let indexer_task = run_indexer(
run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::AutoContinue(AutoContinue {
save_location: Box::new(PathBuf::from(save_path)),
end: AutoContinueEnd::Count(5),
..Default::default()
}),
preprocess_transactions: None,
..Default::default()
},
);
let timer_task = tokio::time::sleep(std::time::Duration::from_secs(2));
tokio::select! {
_ = indexer_task => {},
_ = timer_task => {},
}
)
.await
.unwrap();

assert!(indexer.last_block_height > current_height);
}
Expand Down
7 changes: 5 additions & 2 deletions src/indexer_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -514,10 +514,13 @@ fn test_dec_format_vec() {
}

let test_vec = TestVec {
vec: vec![1_000_000_000_000_000_000_000, 2_000_000_000_000_000_000_000]
vec: vec![1_000_000_000_000_000_000_000, 2_000_000_000_000_000_000_000],
};
let serialized = serde_json::to_string(&test_vec).unwrap();
assert_eq!(serialized, r#"["1000000000000000000000","2000000000000000000000"]"#);
assert_eq!(
serialized,
r#"["1000000000000000000000","2000000000000000000000"]"#
);
let deserialized: TestVec = serde_json::from_str(&serialized).unwrap();
assert_eq!(deserialized, test_vec);
}
36 changes: 30 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,17 @@
#[cfg(feature = "fastnear-data-server")]
pub mod fastnear_data_server;
mod indexer_state;
#[cfg(test)]
mod indexer_tests;
pub mod indexer_utils;
#[cfg(feature = "lake")]
pub mod lake;
#[cfg(feature = "message-provider")]
pub mod message_provider;
#[cfg(test)]
mod tests;

use std::{
fmt::{Debug, Display},
ops::Range,
path::PathBuf,
sync::{
atomic::{AtomicBool, AtomicU64, Ordering},
Expand Down Expand Up @@ -144,13 +145,13 @@ pub async fn run_indexer<
match options.range {
BlockIterator::Iterator(range) => (range, None, None),
BlockIterator::AutoContinue(auto_continue) if auto_continue.ctrl_c_handler => (
Box::new(auto_continue.get_start_block().await..)
Box::new(auto_continue.range().await)
as Box<dyn Iterator<Item = BlockHeight> + Send>,
Some(mpsc::channel::<()>(1)),
Some(Box::new(auto_continue)),
),
BlockIterator::AutoContinue(auto_continue) => (
Box::new(auto_continue.get_start_block().await..)
Box::new(auto_continue.range().await)
as Box<dyn Iterator<Item = BlockHeight> + Send>,
None,
Some(Box::new(auto_continue)),
Expand Down Expand Up @@ -353,9 +354,9 @@ impl Default for IndexerOptions {
fn default() -> Self {
Self {
stop_on_error: false,
range: BlockIterator::Iterator(Box::new(std::iter::once_with(|| {
range: BlockIterator::iterator(std::iter::once_with(|| {
panic!("Range is not set in IndexerOptions")
}))),
})),
preprocess_transactions: Some(PreprocessTransactionsSettings::default()),
genesis_block_height: MAINNET_GENESIS_BLOCK_HEIGHT,
}
Expand All @@ -372,6 +373,18 @@ pub struct AutoContinue {
/// If true, the indexer will gracefully stop on Ctrl+C signal, avoiding double processing of transactions.
/// Transactions that have started, but not finished processing, will be processed again on the next run.
pub ctrl_c_handler: bool,
/// If set, the indexer will stop processing blocks after this height. If None, the indexer will process
/// blocks infinitely.
pub end: AutoContinueEnd,
}

pub enum AutoContinueEnd {
/// The indexer will stop processing blocks after this height.
Height(BlockHeight),
/// The indexer will process this many blocks and then stop.
Count(BlockHeightDelta),
/// The indexer will process blocks infinitely.
Infinite,
}

impl AutoContinue {
Expand All @@ -381,6 +394,16 @@ impl AutoContinue {
.await
.unwrap_or(self.start_height_if_does_not_exist)
}

pub async fn range(&self) -> Range<BlockHeight> {
let start = self.get_start_block().await;
let end = match self.end {
AutoContinueEnd::Height(height) => height,
AutoContinueEnd::Count(count) => start + count,
AutoContinueEnd::Infinite => BlockHeight::MAX,
};
start..end
}
}

impl Default for AutoContinue {
Expand All @@ -389,6 +412,7 @@ impl Default for AutoContinue {
save_location: Box::new(PathBuf::from("last-processed-block.txt")),
start_height_if_does_not_exist: MAINNET_GENESIS_BLOCK_HEIGHT,
ctrl_c_handler: true,
end: AutoContinueEnd::Infinite,
}
}
}
Expand Down

0 comments on commit 3a52bff

Please sign in to comment.