Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(indexer): printing block and write timestamps to logs #556

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions indexer/src/legacy_staking_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ impl<'a> LegacyStakingIndexer<'a> {
}
}

async fn get_timestamp_for_block_number(
&self,
block_number: u64,
client: &Arc<ethers::providers::Provider<ethers::providers::Ws>>,
) -> Result<u64> {
if let Ok(Some(block)) = client.get_block(block_number).await {
return Ok(block.timestamp.as_u64());
}
Err(eyre::eyre!(
"Failed to fetch block timestamp for block {}",
block_number
))
}

async fn listen_for_stake_events(&self, query_start_block: &i32) -> Result<()> {
let client = Arc::new(create_rpc_connection(&self.rpc_url).await);

Expand Down Expand Up @@ -115,19 +129,31 @@ impl<'a> LegacyStakingIndexer<'a> {
IDStakingEvents::SelfStakeFilter(event) => {
let block_number = meta.block_number.as_u32();
let tx_hash = format!("{:?}", meta.transaction_hash);
let block_timestamp = self
.get_timestamp_for_block_number(block_number as u64, &client)
.await?;

self.format_and_save_self_stake_event(
&event,
block_number,
tx_hash,
block_timestamp,
)
.await?;
}
IDStakingEvents::XstakeFilter(event) => {
let block_number = meta.block_number.as_u32();
let tx_hash = format!("{:?}", meta.transaction_hash);
self.format_and_save_x_stake_event(&event, block_number, tx_hash)
.await?
let block_timestamp = self
.get_timestamp_for_block_number(block_number as u64, &client)
.await?;
self.format_and_save_x_stake_event(
&event,
block_number,
tx_hash,
block_timestamp,
)
.await?;
}
_ => {
// Catch all for unhandled events
Expand Down Expand Up @@ -164,14 +190,17 @@ impl<'a> LegacyStakingIndexer<'a> {

let block_number = meta.block_number.as_u32();
let tx_hash = format!("{:?}", meta.transaction_hash);
let block_timestamp = self
.get_timestamp_for_block_number(block_number as u64, &client)
.await?;

match event_value {
IDStakingEvents::SelfStakeFilter(event_value) => {
self.format_and_save_self_stake_event(&event_value, block_number, tx_hash)
self.format_and_save_self_stake_event(&event_value, block_number, tx_hash, block_timestamp)
.await?
}
IDStakingEvents::XstakeFilter(event_value) => {
self.format_and_save_x_stake_event(&event_value, block_number, tx_hash)
self.format_and_save_x_stake_event(&event_value, block_number, tx_hash, block_timestamp)
.await?
}
_ => {
Expand All @@ -188,6 +217,7 @@ impl<'a> LegacyStakingIndexer<'a> {
event: &SelfStakeFilter,
block_number: u32,
transaction_hash: String,
block_timestamp: u64,
) -> Result<()> {
let round_id = event.round_id.as_u32();

Expand All @@ -206,6 +236,7 @@ impl<'a> LegacyStakingIndexer<'a> {
staked,
block_number.try_into().unwrap(),
&transaction_hash,
block_timestamp,
)
.await
{
Expand All @@ -219,6 +250,7 @@ impl<'a> LegacyStakingIndexer<'a> {
event: &XstakeFilter,
block_number: u32,
transaction_hash: String,
block_timestamp: u64,
) -> Result<()> {
// Convert U256 to i32 for round_id
// Be cautious about overflow, and implement a proper check if necessary
Expand All @@ -243,6 +275,7 @@ impl<'a> LegacyStakingIndexer<'a> {
staked,
block_number.try_into().unwrap(),
&transaction_hash,
block_timestamp,
)
.await
{
Expand Down
19 changes: 11 additions & 8 deletions indexer/src/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,15 @@ impl PostgresClient {
staked: bool,
block_number: i32,
tx_hash: &str,
block_timestamp: u64,
) -> Result<(), Error> {
let mut decimal_amount = Decimal::from_str(amount).unwrap();
let _ = decimal_amount.set_scale(18).unwrap();
let client = self.pool.get().await.unwrap();
client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7)",&[&"SelfStake", &round_id, &staker, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
println!(
"Row inserted into registry_gtcstakeevent with type SelfStake for block {}!",
block_number
"Row inserted into registry_gtcstakeevent with type SelfStake for block {} with block time {} at {}!",
block_number, self.unix_time_to_datetime(&block_timestamp), Utc::now()
);
Ok(())
}
Expand All @@ -72,14 +73,15 @@ impl PostgresClient {
staked: bool,
block_number: i32,
tx_hash: &str,
block_timestamp: u64,
) -> Result<(), Error> {
let mut decimal_amount = Decimal::from_str(amount).unwrap();
let _ = decimal_amount.set_scale(18).unwrap();
let client = self.pool.get().await.unwrap();
client.execute("INSERT INTO registry_gtcstakeevent (event_type, round_id, staker, address, amount, staked, block_number, tx_hash) VALUES ($1, $2, $3, $4, $5, $6, $7, $8)", &[&"Xstake", &round_id, &staker, &user, &decimal_amount, &staked, &block_number, &tx_hash]).await?;
println!(
"Row inserted into registry_gtcstakeevent with type Xstake for block {}!",
block_number
"Row inserted into registry_gtcstakeevent with type Xstake for block {} with block time {} at {}!",
block_number, self.unix_time_to_datetime(&block_timestamp), Utc::now()
);
Ok(())
}
Expand Down Expand Up @@ -134,8 +136,8 @@ impl PostgresClient {
).await?;

println!(
"Added or extended stake in block {} on chain {}!",
block_number, chain_id
"Added or extended stake in block {} on chain {} with block time {} at {}!",
block_number, chain_id, lock_time, Utc::now()
);

Ok(())
Expand All @@ -151,6 +153,7 @@ impl PostgresClient {
operation: StakeAmountOperation,
block_number: &u64,
tx_hash: &str,
block_timestamp: u64,
) -> Result<(), Error> {
let chain_id: i32 = chain_id as i32;
let staker = format!("{:#x}", staker);
Expand Down Expand Up @@ -189,8 +192,8 @@ impl PostgresClient {
.await?;

println!(
"Modified stake amount in block {} on chain {}!",
block_number, chain_id
"Modified stake amount in block {} on chain {} at block time {} at {}!",
block_number, chain_id, self.unix_time_to_datetime(&block_timestamp), Utc::now()
);

Ok(())
Expand Down
37 changes: 21 additions & 16 deletions indexer/src/staking_indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ impl<'a> StakingIndexer<'a> {
loop {
let start_block = self.postgres_client.get_latest_block(self.chain_id, self.start_block).await?;
println!(
"Debug - Starting indexer for chain {} at block {}",
self.chain_id, start_block
"Debug - Starting indexer for chain {} at block {} at {}",
self.chain_id, start_block, chrono::Utc::now()
);

match try_join!(
Expand Down Expand Up @@ -194,30 +194,31 @@ impl<'a> StakingIndexer<'a> {
) -> Result<()> {
let block_number = meta.block_number.as_u64();
let tx_hash = format!("{:?}", meta.transaction_hash);
let block_timestamp = self.get_timestamp_for_block_number(block_number, client).await?;

match event {
IdentityStakingEvents::SelfStakeFilter(event) => {
self.process_self_stake_event(&event, block_number, &tx_hash, &client)
self.process_self_stake_event(&event, block_number, &tx_hash, block_timestamp)
.await
}
IdentityStakingEvents::CommunityStakeFilter(event) => {
self.process_community_stake_event(&event, block_number, &tx_hash, &client)
self.process_community_stake_event(&event, block_number, &tx_hash, block_timestamp)
.await
}
IdentityStakingEvents::SelfStakeWithdrawnFilter(event) => {
self.process_self_stake_withdrawn_event(&event, block_number, &tx_hash)
self.process_self_stake_withdrawn_event(&event, block_number, &tx_hash, block_timestamp)
.await
}
IdentityStakingEvents::CommunityStakeWithdrawnFilter(event) => {
self.process_community_stake_withdrawn_event(&event, block_number, &tx_hash)
self.process_community_stake_withdrawn_event(&event, block_number, &tx_hash, block_timestamp)
.await
}
IdentityStakingEvents::SlashFilter(event) => {
self.process_slash_event(&event, block_number, &tx_hash)
self.process_slash_event(&event, block_number, &tx_hash, block_timestamp)
.await
}
IdentityStakingEvents::ReleaseFilter(event) => {
self.process_release_event(&event, block_number, &tx_hash)
self.process_release_event(&event, block_number, &tx_hash, block_timestamp)
.await
}
_ => {
Expand Down Expand Up @@ -251,10 +252,8 @@ impl<'a> StakingIndexer<'a> {
event: &SelfStakeFilter,
block_number: u64,
tx_hash: &String,
client: &Arc<ethers::providers::Provider<ethers::providers::Ws>>,
block_timestamp: u64,
) -> Result<()> {
let timestamp = self.get_timestamp_for_block_number(block_number, client).await?;

if let Err(err) = self
.postgres_client
.add_or_extend_stake(
Expand All @@ -264,7 +263,7 @@ impl<'a> StakingIndexer<'a> {
&event.staker,
&event.amount,
&event.unlock_time,
&timestamp,
&block_timestamp,
&block_number,
tx_hash,
)
Expand All @@ -283,10 +282,8 @@ impl<'a> StakingIndexer<'a> {
event: &CommunityStakeFilter,
block_number: u64,
tx_hash: &String,
client: &Arc<ethers::providers::Provider<ethers::providers::Ws>>,
block_timestamp: u64,
) -> Result<()> {
let timestamp = self.get_timestamp_for_block_number(block_number, client).await?;

if let Err(err) = self
.postgres_client
.add_or_extend_stake(
Expand All @@ -296,7 +293,7 @@ impl<'a> StakingIndexer<'a> {
&event.stakee,
&event.amount,
&event.unlock_time,
&timestamp,
&block_timestamp,
&block_number,
tx_hash,
)
Expand All @@ -315,6 +312,7 @@ impl<'a> StakingIndexer<'a> {
event: &SelfStakeWithdrawnFilter,
block_number: u64,
tx_hash: &String,
block_timestamp: u64,
) -> Result<()> {
if let Err(err) = self
.postgres_client
Expand All @@ -327,6 +325,7 @@ impl<'a> StakingIndexer<'a> {
StakeAmountOperation::Subtract,
&block_number,
tx_hash,
block_timestamp,
)
.await
{
Expand All @@ -343,6 +342,7 @@ impl<'a> StakingIndexer<'a> {
event: &CommunityStakeWithdrawnFilter,
block_number: u64,
tx_hash: &String,
block_timestamp: u64,
) -> Result<()> {
if let Err(err) = self
.postgres_client
Expand All @@ -355,6 +355,7 @@ impl<'a> StakingIndexer<'a> {
StakeAmountOperation::Subtract,
&block_number,
tx_hash,
block_timestamp,
)
.await
{
Expand All @@ -371,6 +372,7 @@ impl<'a> StakingIndexer<'a> {
event: &SlashFilter,
block_number: u64,
tx_hash: &String,
block_timestamp: u64,
) -> Result<()> {
if let Err(err) = self
.postgres_client
Expand All @@ -383,6 +385,7 @@ impl<'a> StakingIndexer<'a> {
StakeAmountOperation::Subtract,
&block_number,
tx_hash,
block_timestamp,
)
.await
{
Expand All @@ -399,6 +402,7 @@ impl<'a> StakingIndexer<'a> {
event: &ReleaseFilter,
block_number: u64,
tx_hash: &String,
block_timestamp: u64,
) -> Result<()> {
if let Err(err) = self
.postgres_client
Expand All @@ -411,6 +415,7 @@ impl<'a> StakingIndexer<'a> {
StakeAmountOperation::Add,
&block_number,
tx_hash,
block_timestamp,
)
.await
{
Expand Down
Loading