Skip to content

Commit

Permalink
starknet: add Provider method for pending blocks
Browse files Browse the repository at this point in the history
**Summary**

Previously the ingestion engine would fetch pending blocks using the
same method as the accepted/finalized blocks. This method returns an
error if the block is not found, which is often the case for pending
blocks. This resulted in confusing logs for operators.

This commit adds a new method that returns `None` if there is no pending
block, resulting in much clearer logs.
  • Loading branch information
fracek committed Jul 23, 2023
1 parent aa6c405 commit 66fd23a
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 69 deletions.
81 changes: 38 additions & 43 deletions starknet/src/ingestion/accepted.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,53 +250,48 @@ where
// some node configurations don't support pending data.
// in that case, simply ignore any error.

match self.provider.get_block(&BlockId::Pending).await {
Err(_) => {
// cannot set pending ingested here because pathfinder returns
// an error if the pending block is not prepared yet.
Ok(())
}
Ok((status, mut header, body)) => {
// pending block is not what was expected. do nothing.
let is_next_pending_block = if let Some(hash) = header.parent_block_hash.as_ref() {
*self.current_head.hash() == hash.into()
} else {
false
};
let Some((status, mut header, body)) = self.provider.get_maybe_block(&BlockId::Pending).await else {
return Ok(())
};

if !is_next_pending_block {
return Ok(());
}
// pending block is not what was expected. do nothing.
let is_next_pending_block = if let Some(hash) = header.parent_block_hash.as_ref() {
*self.current_head.hash() == hash.into()
} else {
false
};

let num_txs = body.transactions.len();
// Use number of transactions as quick way to check if the pending block
// changed.
// Only re-fetch pending block data if it changed.
trace!(
current_size = %num_txs,
previous_size = %self.previous_pending_body_size,
"check if new pending block"
);

if num_txs > self.previous_pending_body_size {
// block number is not set, so do it here.
header.block_number = self.current_head.number() + 1;

// finish ingesting data.
let new_block_id = GlobalBlockId::from_block_header(&header)?;
let mut txn = self.storage.begin_txn()?;
self.downloader
.finish_ingesting_block(&new_block_id, status, header, body, &mut txn)
.await?;
txn.commit()?;

self.previous_pending_body_size = num_txs;
self.publisher.publish_pending(new_block_id)?;
}
if !is_next_pending_block {
return Ok(());
}

Ok(())
}
let num_txs = body.transactions.len();
// Use number of transactions as quick way to check if the pending block
// changed.
// Only re-fetch pending block data if it changed.
trace!(
current_size = %num_txs,
previous_size = %self.previous_pending_body_size,
"check if new pending block"
);

if num_txs > self.previous_pending_body_size {
// block number is not set, so do it here.
header.block_number = self.current_head.number() + 1;

// finish ingesting data.
let new_block_id = GlobalBlockId::from_block_header(&header)?;
let mut txn = self.storage.begin_txn()?;
self.downloader
.finish_ingesting_block(&new_block_id, status, header, body, &mut txn)
.await?;
txn.commit()?;

self.previous_pending_body_size = num_txs;
self.publisher.publish_pending(new_block_id)?;
}

Ok(())
}

#[tracing::instrument(skip(self))]
Expand Down
73 changes: 47 additions & 26 deletions starknet/src/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,12 @@ pub trait Provider {
id: &BlockId,
) -> Result<(v1alpha2::BlockStatus, v1alpha2::BlockHeader, BlockBody), Self::Error>;

/// Get a specific block, return `None` if the block doesn't exist.
async fn get_maybe_block(
&self,
id: &BlockId,
) -> Option<(v1alpha2::BlockStatus, v1alpha2::BlockHeader, BlockBody)>;

/// Get state update for a specific block.
async fn get_state_update(&self, id: &BlockId) -> Result<v1alpha2::StateUpdate, Self::Error>;

Expand Down Expand Up @@ -80,6 +86,39 @@ impl HttpProvider {
let provider = JsonRpcClient::new(http);
HttpProvider { provider }
}

async fn get_block_by_id(
&self,
id: &BlockId,
) -> Result<(v1alpha2::BlockStatus, v1alpha2::BlockHeader, BlockBody), HttpProviderError> {
let block_id: models::BlockId = id.try_into()?;
let block = self
.provider
.get_block_with_txs(block_id)
.await
.map_err(HttpProviderError::from_provider_error)?;

match block {
models::MaybePendingBlockWithTxs::Block(ref block) => {
if id.is_pending() {
return Err(HttpProviderError::UnexpectedPendingBlock);
}
let status = block.to_proto();
let header = block.to_proto();
let body = block.to_proto();
Ok((status, header, body))
}
models::MaybePendingBlockWithTxs::PendingBlock(ref block) => {
if !id.is_pending() {
return Err(HttpProviderError::ExpectedPendingBlock);
}
let status = block.to_proto();
let header = block.to_proto();
let body = block.to_proto();
Ok((status, header, body))
}
}
}
}

impl ProviderError for HttpProviderError {
Expand Down Expand Up @@ -143,33 +182,15 @@ impl Provider for HttpProvider {
&self,
id: &BlockId,
) -> Result<(v1alpha2::BlockStatus, v1alpha2::BlockHeader, BlockBody), Self::Error> {
let block_id: models::BlockId = id.try_into()?;
let block = self
.provider
.get_block_with_txs(block_id)
.await
.map_err(HttpProviderError::from_provider_error)?;
self.get_block_by_id(id).await
}

match block {
models::MaybePendingBlockWithTxs::Block(ref block) => {
if id.is_pending() {
return Err(HttpProviderError::UnexpectedPendingBlock);
}
let status = block.to_proto();
let header = block.to_proto();
let body = block.to_proto();
Ok((status, header, body))
}
models::MaybePendingBlockWithTxs::PendingBlock(ref block) => {
if !id.is_pending() {
return Err(HttpProviderError::ExpectedPendingBlock);
}
let status = block.to_proto();
let header = block.to_proto();
let body = block.to_proto();
Ok((status, header, body))
}
}
#[tracing::instrument(skip(self))]
async fn get_maybe_block(
&self,
id: &BlockId,
) -> Option<(v1alpha2::BlockStatus, v1alpha2::BlockHeader, BlockBody)> {
self.get_block_by_id(id).await.ok()
}

#[tracing::instrument(skip(self), err(Debug))]
Expand Down

0 comments on commit 66fd23a

Please sign in to comment.