Skip to content

Commit

Permalink
Wrap HistoricalRocksDB in an Arc
Browse files Browse the repository at this point in the history
  • Loading branch information
acerone85 committed Oct 2, 2024
1 parent be48975 commit a2f52af
Showing 1 changed file with 188 additions and 28 deletions.
216 changes: 188 additions & 28 deletions crates/fuel-core/src/state/historical_rocksdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,176 @@ pub enum StateRewindPolicy {
RewindRange { size: NonZeroU64 },
}

#[derive(Debug)]
pub struct HistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
inner: Arc<InnerHistoricalRocksDB<Description>>,
}

impl<Description> HistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
pub fn new(
db: RocksDb<Historical<Description>>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let inner = Arc::new(InnerHistoricalRocksDB::new(db, state_rewind_policy)?);
Self::migrate_modifications_history(inner.clone());
Ok(Self { inner })
}

pub fn default_open<P: AsRef<Path>>(
path: P,
capacity: Option<usize>,
state_rewind_policy: StateRewindPolicy,
) -> DatabaseResult<Self> {
let inner = Arc::new(InnerHistoricalRocksDB::default_open(
path,
capacity,
state_rewind_policy,
)?);
Self::migrate_modifications_history(inner.clone());
Ok(Self { inner })
}

fn migrate_modifications_history(
historical_rocksdb: Arc<InnerHistoricalRocksDB<Description>>,
) {
let should_perform_migration =
historical_rocksdb.check_and_update_migration_status();
if !should_perform_migration {
return;
}

tokio::task::spawn_blocking(move || {
let entries = historical_rocksdb.v1_entries();
for entry in entries {
match entry {
Err(e) => {
tracing::error!("Cannot read from database while migrating modification history: {e:?}");
break;
}
Ok((height, _change)) => {
historical_rocksdb
.migrate_modifications_history_at_height(height)
.expect("Accumulating migration changes cannot fail");
}
}
historical_rocksdb.check_and_update_migration_status();
}
});
}

pub fn latest_view(&self) -> RocksDb<Description> {
self.inner.latest_view()
}

pub fn create_view_at(
&self,
height: &Description::Height,
) -> StorageResult<ViewAtHeight<Description>> {
self.inner.create_view_at(height)
}
}

impl<Description> KeyValueInspect for HistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
type Column = Description::Column;

fn exists(&self, key: &[u8], column: Self::Column) -> StorageResult<bool> {
self.inner.exists(key, column)
}

fn size_of_value(
&self,
key: &[u8],
column: Self::Column,
) -> StorageResult<Option<usize>> {
self.inner.size_of_value(key, column)
}

fn get(&self, key: &[u8], column: Self::Column) -> StorageResult<Option<Value>> {
self.inner.get(key, column)
}

fn read(
&self,
key: &[u8],
column: Self::Column,
buf: &mut [u8],
) -> StorageResult<Option<usize>> {
self.inner.read(key, column, buf)
}
}

impl<'a, Description> IterableStore for HistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
fn iter_store(
&self,
column: Self::Column,
prefix: Option<&[u8]>,
start: Option<&[u8]>,
direction: IterDirection,
) -> BoxedIter<KVItem> {
self.inner.iter_store(column, prefix, start, direction)
}

fn iter_store_keys(
&self,
column: Self::Column,
prefix: Option<&[u8]>,
start: Option<&[u8]>,
direction: IterDirection,
) -> BoxedIter<fuel_core_storage::kv_store::KeyItem> {
self.inner.iter_store_keys(column, prefix, start, direction)
}
}

impl<Description> TransactableStorage<Description::Height>
for HistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
fn commit_changes(
&self,
height: Option<Description::Height>,
changes: Changes,
) -> StorageResult<()> {
self.inner.commit_changes(height, changes)
}

fn view_at_height(
&self,
height: &Description::Height,
) -> StorageResult<KeyValueView<ColumnType<Description>>> {
self.inner.view_at_height(height)
}

fn latest_view(
&self,
) -> StorageResult<IterableKeyValueView<ColumnType<Description>>> {
<InnerHistoricalRocksDB<Description> as TransactableStorage<
Description::Height,
>>::latest_view(&*self.inner)
}

fn rollback_block_to(&self, height: &Description::Height) -> StorageResult<()> {
<InnerHistoricalRocksDB<Description> as TransactableStorage<
Description::Height,
>>::rollback_block_to(&*self.inner, height)
}
}

/// Implementation of a database
#[derive(Debug)]
pub struct HistoricalRocksDB<Description> {
struct InnerHistoricalRocksDB<Description> {
/// The [`StateRewindPolicy`] used by the historical rocksdb
state_rewind_policy: StateRewindPolicy,
/// The Description of the database.
Expand All @@ -105,7 +272,7 @@ pub struct HistoricalRocksDB<Description> {
modifications_history_migration_in_progress: Arc<AtomicBool>,
}

impl<Description> HistoricalRocksDB<Description>
impl<Description> InnerHistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
Expand Down Expand Up @@ -420,10 +587,7 @@ where
/// the mivration from ModificationHistoryV1 to ModificationHistoryV2
/// are simply recorded, and will be flushed to the database when it
/// commits or rollbacks to a new height.
pub fn migrate_modifications_history_at_height(
&self,
height: u64,
) -> StorageResult<()> {
fn migrate_modifications_history_at_height(&self, height: u64) -> StorageResult<()> {
let mut changes_guard = self.migration_changes.lock().map_err(|poisoned| {
StorageError::Other(anyhow::anyhow!("Lock is poisoned: {poisoned:?}"))
})?;
Expand Down Expand Up @@ -461,18 +625,12 @@ where
Ok(())
}

pub fn v1_entries(&self) -> BoxedIter<StorageResult<(u64, Changes)>> {
fn v1_entries(&self) -> BoxedIter<StorageResult<(u64, Changes)>> {
self.db
.iter_all::<ModificationsHistoryV1<Description>>(None)
}

// Signals that the migration is complete
pub fn complete_migration(&self) {
self.modifications_history_migration_in_progress
.store(false, Release);
}

pub fn check_and_update_migration_status(&self) -> bool {
fn check_and_update_migration_status(&self) -> bool {
let result = self.v1_entries().next().is_some();
// Other threads will need to synchronise with this store operation
// when checking the migration status, hence we set the ordering to Release
Expand Down Expand Up @@ -618,7 +776,7 @@ where
Ok(())
}

impl<Description> KeyValueInspect for HistoricalRocksDB<Description>
impl<Description> KeyValueInspect for InnerHistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
Expand Down Expand Up @@ -650,7 +808,7 @@ where
}
}

impl<'a, Description> IterableStore for HistoricalRocksDB<Description>
impl<'a, Description> IterableStore for InnerHistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
Expand Down Expand Up @@ -678,7 +836,7 @@ where
}

impl<Description> TransactableStorage<Description::Height>
for HistoricalRocksDB<Description>
for InnerHistoricalRocksDB<Description>
where
Description: DatabaseDescription,
{
Expand Down Expand Up @@ -814,7 +972,8 @@ mod tests {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db =
HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
InnerHistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange)
.unwrap();

// Set the value at height 1 to be 123.
let mut transaction = historical_rocks_db.read_transaction();
Expand Down Expand Up @@ -854,7 +1013,8 @@ mod tests {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db =
HistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange).unwrap();
InnerHistoricalRocksDB::new(rocks_db, StateRewindPolicy::RewindFullRange)
.unwrap();

// Set the value at height 1 to be 123.
let mut transaction = historical_rocks_db.read_transaction();
Expand Down Expand Up @@ -894,7 +1054,7 @@ mod tests {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db =
HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
InnerHistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();

let mut transaction = historical_rocks_db.read_transaction();
transaction
Expand All @@ -921,7 +1081,7 @@ mod tests {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db =
HistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();
InnerHistoricalRocksDB::new(rocks_db, StateRewindPolicy::NoRewind).unwrap();

let mut transaction = historical_rocks_db.read_transaction();
transaction
Expand All @@ -943,7 +1103,7 @@ mod tests {
fn state_rewind_policy__rewind_range_1__cleanup_in_range_works() {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(1).unwrap(),
Expand Down Expand Up @@ -990,7 +1150,7 @@ mod tests {
fn state_rewind_policy__rewind_range_1__rollback_works() {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(1).unwrap(),
Expand Down Expand Up @@ -1028,7 +1188,7 @@ mod tests {
fn state_rewind_policy__rewind_range_1__rollback_uses_v2() {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(1).unwrap(),
Expand Down Expand Up @@ -1063,7 +1223,7 @@ mod tests {
fn state_rewind_policy__rewind_range_1__rollback_during_migration_works() {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(1).unwrap(),
Expand Down Expand Up @@ -1136,7 +1296,7 @@ mod tests {
fn migrate_modifications_history_works() {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(1).unwrap(),
Expand Down Expand Up @@ -1239,7 +1399,7 @@ mod tests {
fn state_rewind_policy__rewind_range_1__second_rollback_fails() {
// Given
let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(1).unwrap(),
Expand Down Expand Up @@ -1269,7 +1429,7 @@ mod tests {
const ITERATIONS: usize = 100;

let rocks_db = RocksDb::<Historical<OnChain>>::default_open_temp(None).unwrap();
let historical_rocks_db = HistoricalRocksDB::new(
let historical_rocks_db = InnerHistoricalRocksDB::new(
rocks_db,
StateRewindPolicy::RewindRange {
size: NonZeroU64::new(ITERATIONS as u64).unwrap(),
Expand Down

0 comments on commit a2f52af

Please sign in to comment.