Skip to content

Commit

Permalink
Fix: check diff before commiting it to storage (#92)
Browse files Browse the repository at this point in the history
(cherry picked from commit 5c92fae)
  • Loading branch information
birchmd authored and aleksuss committed Aug 30, 2023
1 parent 13b5619 commit 396ac00
Show file tree
Hide file tree
Showing 4 changed files with 169 additions and 67 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

69 changes: 69 additions & 0 deletions engine/src/batch_tx_processing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use aurora_engine_sdk::io::IO;
use engine_standalone_storage::{
engine_state::{EngineStateAccess, EngineStorageValue},
Diff,
};
use std::cell::RefCell;

#[derive(Clone, Copy)]
pub struct BatchIO<'db, 'local> {
pub fallback: EngineStateAccess<'db, 'db, 'db>,
pub cumulative_diff: &'local Diff,
pub current_diff: &'local RefCell<Diff>,
}

impl<'db, 'local> IO for BatchIO<'db, 'local> {
type StorageValue = EngineStorageValue<'db>;

fn read_input(&self) -> Self::StorageValue {
self.fallback.read_input()
}

fn return_output(&mut self, value: &[u8]) {
self.fallback.return_output(value)
}

fn read_storage(&self, key: &[u8]) -> Option<Self::StorageValue> {
if let Some(diff) = self
.current_diff
.borrow()
.get(key)
.or_else(|| self.cumulative_diff.get(key))
{
return diff
.value()
.map(|bytes| EngineStorageValue::Vec(bytes.to_vec()));
}
self.fallback.read_storage(key)
}

fn storage_has_key(&self, key: &[u8]) -> bool {
self.read_storage(key).is_some()
}

fn write_storage(&mut self, key: &[u8], value: &[u8]) -> Option<Self::StorageValue> {
let original_value = self.read_storage(key);

self.current_diff
.borrow_mut()
.modify(key.to_vec(), value.to_vec());

original_value
}

fn write_storage_direct(
&mut self,
key: &[u8],
value: Self::StorageValue,
) -> Option<Self::StorageValue> {
self.write_storage(key, value.as_ref())
}

fn remove_storage(&mut self, key: &[u8]) -> Option<Self::StorageValue> {
let original_value = self.read_storage(key);

self.current_diff.borrow_mut().delete(key.to_vec());

original_value
}
}
1 change: 1 addition & 0 deletions engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::collections::HashMap;
use std::num::NonZeroUsize;
use std::path::Path;

mod batch_tx_processing;
pub mod gas;
pub mod sync;
#[cfg(test)]
Expand Down
158 changes: 95 additions & 63 deletions engine/src/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,10 @@ use engine_standalone_storage::{
BlockMetadata, Diff, Storage,
};
use lru::LruCache;
use std::convert::TryFrom;
use std::{collections::HashMap, str::FromStr};
use std::{cell::RefCell, collections::HashMap, convert::TryFrom, str::FromStr};
use tracing::{debug, warn};

use crate::types::InnerTransactionKind;
use crate::{batch_tx_processing::BatchIO, types::InnerTransactionKind};

pub fn consume_near_block<M: ModExpAlgorithm>(
storage: &mut Storage,
Expand Down Expand Up @@ -241,22 +240,17 @@ pub fn consume_near_block<M: ModExpAlgorithm>(
|| submit_result.as_ref().unwrap() != &expected_result
{
warn!(
"Incorrect result in processing receipt_id={:?} computed={:?} expected={:?}",
receipt_id,
submit_result,
expected_result,
"Incorrect result in processing receipt_id={receipt_id:?} computed differed from expected",
);
}
}
Err(_) => warn!(
"Unable to deserialize receipt_id={:?} as SubmitResult",
receipt_id
"Unable to deserialize receipt_id={receipt_id:?} as SubmitResult",
),
}
}
None => warn!(
"Expected receipt_id={:?} to have a return result, but there was none",
receipt_id
"Expected receipt_id={receipt_id:?} to have a return result, but there was none",
),
}
}
Expand All @@ -265,24 +259,18 @@ pub fn consume_near_block<M: ModExpAlgorithm>(
None => {
if !tx_outcome.diff().is_empty() {
warn!(
"Receipt {:?} not expected to have changes, but standalone computed diff {:?}",
receipt_id, tx_outcome.diff(),
"Receipt {receipt_id:?} not expected to have changes, but standalone computed a non-empty diff",
);
tx_outcome.revert(storage)?;
}
}
Some(expected_diff) => {
if expected_diff != tx_outcome.diff() {
warn!(
"Diff mismatch in receipt_id={:?} computed={:?} ; expected={:?}",
receipt_id,
tx_outcome.diff(),
expected_diff,
);
// Need to delete the incorrect diff before adding the correct diff because it could be
// the case that the incorrect diff wrote some keys that the correct diff did not
// (and these writes need to be undone).
tx_outcome.revert(storage)?;
if expected_diff == tx_outcome.diff() {
// Diff was correct, so commit it to the storage
tx_outcome.commit(storage)?;
} else {
// Diff was incorrect, so log a warning and commit
// the one from the Near block instead
warn!("Receipt {receipt_id:?} diff mismatch with computed diff");
tx_outcome.update_diff(storage, expected_diff)?;
}
}
Expand Down Expand Up @@ -391,40 +379,92 @@ impl TransactionBatch {
..
} => {
let mut non_last_outcomes = Vec::with_capacity(non_last_actions.len());
for tx in non_last_actions {
match sync::consume_message::<M>(storage, Message::Transaction(Box::new(tx)))? {
ConsumeMessageOutcome::TransactionIncluded(tx_outcome) => {
debug!("COMPLETED {:?}", tx_outcome.hash);
non_last_outcomes.push(*tx_outcome);
}
// We sent a transaction message tagged as successful, so we can only get `TransactionIncluded` back
ConsumeMessageOutcome::BlockAdded
| ConsumeMessageOutcome::FailedTransactionIgnored => unreachable!(),
let mut cumulative_diff = Diff::default();

let block_hash = match last_action.as_ref() {
Some(tx_msg) => tx_msg.block_hash,
None => {
// This case should never come up because empty
// batches are thrown out before processing.
return Ok(TransactionBatchOutcome::Batch {
cumulative_diff: Diff::default(),
non_last_outcomes: Vec::new(),
last_outcome: None,
});
}
};
let block_height = storage.get_block_height_by_hash(block_hash)?;
let block_metadata = storage.get_block_metadata(block_hash)?;
let engine_account_id = storage.get_engine_account_id()?;

// We need to use `BatchIO` here instead of simply calling `sync::consume_message` because
// the latter no longer persists to the DB right away (we wait util checking the expected diff first now),
// but a later transaction in a batch can see earlier ones, therefore we need to keep track all
// changes made and expose them as if they had been committed to the DB.
for tx in non_last_actions {
let transaction_position = tx.position;
let local_engine_account_id = engine_account_id.clone();
let (tx_hash, diff, result) = storage
.with_engine_access(block_height, transaction_position, &[], |io| {
let local_diff = RefCell::new(Diff::default());
let batch_io = BatchIO {
fallback: io,
cumulative_diff: &cumulative_diff,
current_diff: &local_diff,
};
sync::execute_transaction::<_, M, _>(
&tx,
block_height,
&block_metadata,
local_engine_account_id,
batch_io,
|x| x.current_diff.borrow().clone(),
)
})
.result;
cumulative_diff.append(diff.clone());
let tx_outcome = TransactionIncludedOutcome {
hash: tx_hash,
info: tx,
diff,
maybe_result: result,
};
debug!("COMPLETED {:?}", tx_outcome.hash);
non_last_outcomes.push(tx_outcome);
}
let last_outcome = match last_action {
None => None,
Some(tx) => {
match sync::consume_message::<M>(
storage,
Message::Transaction(Box::new(tx)),
)? {
ConsumeMessageOutcome::TransactionIncluded(tx_outcome) => {
debug!("COMPLETED {:?}", tx_outcome.hash);
Some(tx_outcome)
}
ConsumeMessageOutcome::BlockAdded
| ConsumeMessageOutcome::FailedTransactionIgnored => unreachable!(),
}
let transaction_position = tx.position;
let (tx_hash, diff, result) = storage
.with_engine_access(block_height, transaction_position, &[], |io| {
let local_diff = RefCell::new(Diff::default());
let batch_io = BatchIO {
fallback: io,
cumulative_diff: &cumulative_diff,
current_diff: &local_diff,
};
sync::execute_transaction::<_, M, _>(
&tx,
block_height,
&block_metadata,
engine_account_id,
batch_io,
|x| x.current_diff.borrow().clone(),
)
})
.result;
cumulative_diff.append(diff.clone());
let tx_outcome = TransactionIncludedOutcome {
hash: tx_hash,
info: tx,
diff,
maybe_result: result,
};
debug!("COMPLETED {:?}", tx_outcome.hash);
Some(Box::new(tx_outcome))
}
};
let cumulative_diff = non_last_outcomes
.iter()
.chain(last_outcome.iter().map(|x| x.as_ref()))
.fold(Diff::default(), |mut acc, outcome| {
acc.append(outcome.diff.clone());
acc
});
Ok(TransactionBatchOutcome::Batch {
cumulative_diff,
non_last_outcomes,
Expand Down Expand Up @@ -454,13 +494,9 @@ impl TransactionBatchOutcome {
}
}

fn revert(&self, storage: &mut Storage) -> Result<(), engine_standalone_storage::Error> {
fn commit(&self, storage: &mut Storage) -> Result<(), engine_standalone_storage::Error> {
match self {
Self::Single(tx_outcome) => storage.revert_transaction_included(
tx_outcome.hash,
&tx_outcome.info,
&tx_outcome.diff,
),
Self::Single(tx_outcome) => tx_outcome.commit(storage),
Self::Batch {
non_last_outcomes,
last_outcome,
Expand All @@ -470,11 +506,7 @@ impl TransactionBatchOutcome {
.iter()
.chain(last_outcome.iter().map(|x| x.as_ref()));
for tx_outcome in all_outcomes {
storage.revert_transaction_included(
tx_outcome.hash,
&tx_outcome.info,
&tx_outcome.diff,
)?
tx_outcome.commit(storage)?
}
Ok(())
}
Expand Down

0 comments on commit 396ac00

Please sign in to comment.