From 0c84d5c0f5b3b7bd1d5fcc13fb6c59a415edeb2e Mon Sep 17 00:00:00 2001 From: snorochevskiy Date: Tue, 22 Oct 2024 13:42:20 +0300 Subject: [PATCH] MTG-750 Adding bubblegum checksums storage --- nft_ingester/src/bin/ingester/main.rs | 1 + .../src/bubblegum_updates_processor.rs | 7 + nft_ingester/src/consistency_calculator.rs | 199 +++++++ nft_ingester/src/fork_cleaner.rs | 20 +- nft_ingester/src/lib.rs | 2 + nft_ingester/src/solana_slot_service.rs | 50 ++ nft_ingester/tests/clean_forks_test.rs | 2 + .../tests/consistency_calculator_test.rs | 104 ++++ rocks-db/Cargo.toml | 1 + rocks-db/src/lib.rs | 41 ++ rocks-db/src/storage_consistency.rs | 492 ++++++++++++++++++ rocks-db/src/transaction.rs | 2 + rocks-db/src/transaction_client.rs | 1 + 13 files changed, 921 insertions(+), 1 deletion(-) create mode 100644 nft_ingester/src/consistency_calculator.rs create mode 100644 nft_ingester/src/solana_slot_service.rs create mode 100644 nft_ingester/tests/consistency_calculator_test.rs create mode 100644 rocks-db/src/storage_consistency.rs diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 5ab999fc..803642c2 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -686,6 +686,7 @@ pub async fn main() -> Result<(), IngesterError> { } let fork_cleaner = ForkCleaner::new( + primary_rocks_storage.clone(), primary_rocks_storage.clone(), primary_rocks_storage.clone(), metrics_state.fork_cleaner_metrics.clone(), diff --git a/nft_ingester/src/bubblegum_updates_processor.rs b/nft_ingester/src/bubblegum_updates_processor.rs index fe709dfd..2061a536 100644 --- a/nft_ingester/src/bubblegum_updates_processor.rs +++ b/nft_ingester/src/bubblegum_updates_processor.rs @@ -112,6 +112,9 @@ impl BubblegumTxProcessor { .await .map_err(|e| IngesterError::DatabaseError(e.to_string())); + self.calc_checksums_if_needed(&result.instruction_results) + .await; + result_to_metrics(self.metrics.clone(), &res, "process_transaction"); self.metrics.set_latency( "process_transaction", @@ -1169,6 +1172,10 @@ impl BubblegumTxProcessor { Ok(()) } + + async fn calc_checksums_if_needed(&self, instructions: &[InstructionResult]) { + // TODO: if there was a change with a slot from the previous epoch, **emit** checksum recalc + } } fn use_method_from_mpl_bubblegum_state( diff --git a/nft_ingester/src/consistency_calculator.rs b/nft_ingester/src/consistency_calculator.rs new file mode 100644 index 00000000..72b5369c --- /dev/null +++ b/nft_ingester/src/consistency_calculator.rs @@ -0,0 +1,199 @@ +use rocks_db::{ + column::TypedColumn, + storage_consistency::{ + self, grand_epoch_of_epoch, BubblegumChange, BubblegumChangeKey, BubblegumEpoch, + BubblegumEpochKey, BubblegumGrandEpoch, BubblegumGrandEpochKey, + }, + Storage, +}; +use solana_sdk::{hash::Hasher, pubkey::Pubkey}; +use std::{sync::Arc, time::Duration}; +use storage_consistency::{BUBBLEGUM_EPOCH_CALCULATING, BUBBLEGUM_GRAND_EPOCH_CALCULATING}; +use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle}; + +pub enum ConsistencyCalcMsg { + EpochChanged { epoch: u32 }, + BubblegumUpdated { tree: Pubkey, slot: u64 }, + AccUpdated { account: Pubkey, slot: u64 }, +} + +/// An entry point for checksums calculation component. +/// Should be called from "main". +pub async fn run_bg_consistency_calculator( + mut rcv: UnboundedReceiver, + storage: Arc, +) { + tokio::spawn(async move { + let mut _bubblegum_task = None; + + // Don't look here too much for now, it is to be implemented + loop { + match rcv.recv().await { + Some(msg) => match msg { + ConsistencyCalcMsg::EpochChanged { epoch } => { + // TODO: Scheduler epoch calc. Should calc immediately or wait, since more data can come? + let t = schedule_bublegum_calc( + Duration::from_secs(300), + storage.clone(), + epoch, + ); + _bubblegum_task = Some(t); + } + ConsistencyCalcMsg::BubblegumUpdated { tree: _, slot: _ } => todo!(), + ConsistencyCalcMsg::AccUpdated { + account: _, + slot: _, + } => todo!(), + }, + None => break, + } + } + }); +} + +/// After a given lag, launches checksum calculation for the given epoch. +/// +/// ## Args: +/// * `start_lag` - time to wait before actual calculation. +/// Required because we might want to wait for late updates. +/// * `storage` - database +/// * `epoch` - the epoch the calculation should be done for +fn schedule_bublegum_calc( + start_lag: Duration, + storage: Arc, + epoch: u32, +) -> JoinHandle<()> { + tokio::spawn(async move { + tokio::time::sleep(start_lag).await; + calc_bubblegum_checksums(&storage, epoch).await; + }) +} + +pub async fn calc_bubblegum_checksums(storage: &Storage, epoch: u32) { + calc_bubblegum_epoch(&storage, epoch).await; + calc_bubblegum_grand_epoch(&storage, grand_epoch_of_epoch(epoch)).await; +} + +/// Calculates and stores bubblegum epoch checksums for bubblegum updates +/// received during the given epoch. +/// +/// ## Args: +/// * `storage` - database +/// * `target_epoch` - the number of an epoch the checksum should be calculated for +async fn calc_bubblegum_epoch(storage: &Storage, target_epoch: u32) { + // iterate over changes and calculate checksum per tree. + + let mut current_tree: Option = None; + + let mut it = storage + .bubblegum_changes + .iter(BubblegumChangeKey::epoch_start_key(target_epoch)); + let mut hasher = Hasher::default(); + + while let Some(Ok((k, v))) = it.next() { + let Ok(change_key) = BubblegumChange::decode_key(k.to_vec()) else { + continue; + }; + if change_key.epoch > target_epoch { + break; + } + if current_tree != Some(change_key.tree_pubkey) { + if current_tree.is_some() { + // write checksum for previous tree + let epoch_key = BubblegumEpochKey::new(current_tree.unwrap(), target_epoch); + + let epoch_val = BubblegumEpoch::from(hasher.result().to_bytes()); + let _ = storage + .bubblegum_epochs + .merge(epoch_key.clone(), epoch_val) + .await; + + if let Ok(Some(storage_consistency::BUBBLEGUM_EPOCH_INVALIDATED)) = + storage.bubblegum_epochs.get(epoch_key) + { + // TODO: Means more changes for the tree have come while the epoch chechsum calculation was in process. + // How to handle? + } + } + current_tree = Some(change_key.tree_pubkey); + + let new_epoch_key = BubblegumEpochKey::new(current_tree.unwrap(), target_epoch); + storage + .bubblegum_epochs + .put(new_epoch_key, BUBBLEGUM_EPOCH_CALCULATING); + + hasher = Hasher::default(); + } + hasher.hash(&k); + hasher.hash(&v); + } + + if let Some(current_tree) = current_tree { + let epoch_key = BubblegumEpochKey { + tree_pubkey: current_tree, + epoch_num: target_epoch, + }; + let epoch_val = BubblegumEpoch::from(hasher.result().to_bytes()); + let _ = storage.bubblegum_epochs.merge(epoch_key, epoch_val).await; + } +} + +async fn calc_bubblegum_grand_epoch(storage: &Storage, target_grand_epoch: u16) { + let mut current_tree: Option = None; + + let mut it = storage + .bubblegum_epochs + .iter(BubblegumEpochKey::grand_epoch_start_key(target_grand_epoch)); + let mut hasher = Hasher::default(); + + while let Some(Ok((k, v))) = it.next() { + let Ok(epoch_key) = BubblegumEpoch::decode_key(k.to_vec()) else { + continue; + }; + let element_grand_epoch = grand_epoch_of_epoch(epoch_key.epoch_num); + if element_grand_epoch > target_grand_epoch { + break; + } + if current_tree != Some(epoch_key.tree_pubkey) { + if current_tree.is_some() { + // write checksum for previous tree + let grand_epoch_key = + BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch); + let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes()); + let _ = storage + .bubblegum_grand_epochs + .merge(grand_epoch_key.clone(), grand_epoch_val) + .await; + + if let Ok(Some(storage_consistency::BUBBLEGUM_GRAND_EPOCH_INVALIDATED)) = + storage.bubblegum_grand_epochs.get(grand_epoch_key) + { + // TODO: ??? + } + } + current_tree = Some(epoch_key.tree_pubkey); + + let new_grand_epoch_key = + BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch); + let _ = storage + .bubblegum_grand_epochs + .put(new_grand_epoch_key, BUBBLEGUM_GRAND_EPOCH_CALCULATING); + + hasher = Hasher::default(); + } + hasher.hash(&k); + hasher.hash(&v); + } + + if let Some(current_tree) = current_tree { + let grand_epoch_key = BubblegumGrandEpochKey { + tree_pubkey: current_tree, + grand_epoch_num: target_grand_epoch, + }; + let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes()); + let _ = storage + .bubblegum_grand_epochs + .merge(grand_epoch_key, grand_epoch_val) + .await; + } +} diff --git a/nft_ingester/src/fork_cleaner.rs b/nft_ingester/src/fork_cleaner.rs index 8ced519c..e4cf965a 100644 --- a/nft_ingester/src/fork_cleaner.rs +++ b/nft_ingester/src/fork_cleaner.rs @@ -1,6 +1,8 @@ use entities::models::ForkedItem; use interface::fork_cleaner::{CompressedTreeChangesManager, ForkChecker}; use metrics_utils::ForkCleanerMetricsConfig; +use rocks_db::storage_consistency::BubblegumChangeKey; +use rocks_db::storage_consistency::DataConsistencyStorage; use rocks_db::Storage; use solana_sdk::pubkey::Pubkey; use solana_sdk::signature::Signature; @@ -46,6 +48,7 @@ where { cl_items_manager: Arc, fork_checker: Arc, + data_consistency_storage: Arc, metrics: Arc, } @@ -57,11 +60,13 @@ where pub fn new( cl_items_manager: Arc, fork_checker: Arc, + data_consistency_storage: Arc, metrics: Arc, ) -> Self { Self { cl_items_manager, fork_checker, + data_consistency_storage, metrics, } } @@ -75,6 +80,7 @@ where let mut forked_slots = 0; let mut delete_items = Vec::new(); + let mut changes_to_delete = Vec::new(); let mut signatures_to_drop = Vec::new(); @@ -133,7 +139,7 @@ where // dropping only sequence 5 would result in an incorrect update during backfill. // therefore, we need to drop sequence 4 as well. Sequence 5 must be dropped because // it contains a different tree update in the main branch - for sequences in signature.slot_sequences.values() { + for (slot, sequences) in signature.slot_sequences.iter() { for seq in sequences { delete_items.push(ForkedItem { tree: signature.tree, @@ -142,6 +148,12 @@ where // because deletion will happen by tree and seq values node_idx: 0, }); + + changes_to_delete.push(BubblegumChangeKey::new( + signature.tree, + *slot, + *seq, + )); } } } @@ -150,11 +162,17 @@ where } if delete_items.len() >= CI_ITEMS_DELETE_BATCH_SIZE { + self.data_consistency_storage + .drop_forked_bubblegum_changes(&changes_to_delete) + .await; self.delete_tree_seq_idx(&mut delete_items).await; } } if !delete_items.is_empty() { + self.data_consistency_storage + .drop_forked_bubblegum_changes(&changes_to_delete) + .await; self.delete_tree_seq_idx(&mut delete_items).await; } diff --git a/nft_ingester/src/lib.rs b/nft_ingester/src/lib.rs index acd2797e..c33251e6 100644 --- a/nft_ingester/src/lib.rs +++ b/nft_ingester/src/lib.rs @@ -6,6 +6,7 @@ pub mod batch_mint; pub mod bubblegum_updates_processor; pub mod buffer; pub mod config; +pub mod consistency_calculator; pub mod error; pub mod flatbuffer_mapper; pub mod fork_cleaner; @@ -27,6 +28,7 @@ pub mod redis_receiver; pub mod rocks_db; pub mod scheduler; pub mod sequence_consistent; +pub mod solana_slot_service; pub mod tcp_receiver; pub mod token_updates_processor; pub mod transaction_ingester; diff --git a/nft_ingester/src/solana_slot_service.rs b/nft_ingester/src/solana_slot_service.rs new file mode 100644 index 00000000..7c6250ff --- /dev/null +++ b/nft_ingester/src/solana_slot_service.rs @@ -0,0 +1,50 @@ +use std::{sync::Arc, time::Duration}; + +use rocks_db::storage_consistency::{ + current_estimated_epoch, epoch_of_slot, slots_to_next_epoch, update_estimated_epoch, +}; +use solana_client::nonblocking::rpc_client::RpcClient; + +use crate::consistency_calculator::ConsistencyCalcMsg; + +const SLOT_TIME: u64 = 400; + +/// Background job that monitors current solana slot number, +/// and notifies downstream components about epoch change. +/// This component is an __optional__ part of p2p consistency checking mechanis. +pub struct SolanaSlotService { + rpc_client: Arc, + epoch_changed_notifier: tokio::sync::mpsc::UnboundedSender, +} + +const MIN_LAG_TO_CHECK_SLOT: u64 = 60; + +impl SolanaSlotService { + async fn run_epoch_changed_notifier(&self) { + loop { + let estimated_epoch = current_estimated_epoch(); + + let Ok(solana_slot) = self.rpc_client.get_slot().await else { + tokio::time::sleep(Duration::from_secs(MIN_LAG_TO_CHECK_SLOT)).await; + continue; + }; + let current_epoch = epoch_of_slot(solana_slot); + + if current_epoch > estimated_epoch { + update_estimated_epoch(current_epoch); + let _ = self + .epoch_changed_notifier + .send(ConsistencyCalcMsg::EpochChanged { + epoch: current_epoch, + }); + } else { + let wait_more_slots = slots_to_next_epoch(solana_slot); + let mut seconds_to_wait = (wait_more_slots * SLOT_TIME) / 1000; + if seconds_to_wait < MIN_LAG_TO_CHECK_SLOT { + seconds_to_wait += MIN_LAG_TO_CHECK_SLOT; + } + tokio::time::sleep(Duration::from_secs(seconds_to_wait)).await; + } + } + } +} diff --git a/nft_ingester/tests/clean_forks_test.rs b/nft_ingester/tests/clean_forks_test.rs index 2bf33689..d3885f3e 100644 --- a/nft_ingester/tests/clean_forks_test.rs +++ b/nft_ingester/tests/clean_forks_test.rs @@ -641,6 +641,7 @@ async fn test_clean_forks() { let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); let rx = shutdown_rx.resubscribe(); let fork_cleaner = ForkCleaner::new( + storage.clone(), storage.clone(), storage.clone(), metrics_state.fork_cleaner_metrics.clone(), @@ -970,6 +971,7 @@ async fn test_process_forked_transaction() { let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); let fork_cleaner = ForkCleaner::new( + storage.clone(), storage.clone(), storage.clone(), metrics_state.fork_cleaner_metrics.clone(), diff --git a/nft_ingester/tests/consistency_calculator_test.rs b/nft_ingester/tests/consistency_calculator_test.rs new file mode 100644 index 00000000..2bb14a35 --- /dev/null +++ b/nft_ingester/tests/consistency_calculator_test.rs @@ -0,0 +1,104 @@ +#[cfg(test)] +mod tests { + use nft_ingester::consistency_calculator::calc_bubblegum_checksums; + use rocks_db::{ + column::TypedColumn, + storage_consistency::{ + BubblegumChange, BubblegumChangeKey, BubblegumEpoch, BubblegumEpochKey, + BubblegumGrandEpochKey, Checksum, + }, + }; + use setup::rocks::RocksTestEnvironment; + use solana_sdk::{hash::Hasher, pubkey::Pubkey}; + + /// This test checks that checksum calculation for bubblegum contract + /// correctly calculates checksum for the given epoch, + /// and only for the given epoch. + #[tokio::test] + async fn test_calc_epoch() { + // prepare + let tree1 = Pubkey::new_unique(); + + let storage = RocksTestEnvironment::new(&[]).storage; + + // This change is for epoch we won't calculate in the test, + // adding it just to verify it is ignored + let k0_1 = BubblegumChangeKey::new(tree1, 111, 1); + let v0_1 = BubblegumChange { + signature: "1".to_string(), + }; + storage + .bubblegum_changes + .put(k0_1.clone(), v0_1.clone()) + .unwrap(); + + // Adding bubblegum changes checksum is calculated of + let k1_1 = BubblegumChangeKey::new(tree1, 10111, 2); + let v1_1 = BubblegumChange { + signature: "2".to_string(), + }; + storage + .bubblegum_changes + .put(k1_1.clone(), v1_1.clone()) + .unwrap(); + + let k1_2 = BubblegumChangeKey::new(tree1, 10112, 3); + let v1_2 = BubblegumChange { + signature: "3".to_string(), + }; + storage + .bubblegum_changes + .put(k1_2.clone(), v1_2.clone()) + .unwrap(); + + // This will be also ignored + let k2_1 = BubblegumChangeKey::new(tree1, 20000, 4); + let v2_1 = BubblegumChange { + signature: "4".to_string(), + }; + storage + .bubblegum_changes + .put(k2_1.clone(), v2_1.clone()) + .unwrap(); + + // Calculate epoch and grand epoch checksum + calc_bubblegum_checksums(&storage, 1).await; + + let expected_epoch_checksum = { + let mut hasher = Hasher::default(); + hasher.hash(&BubblegumChange::encode_key(k1_1)); + hasher.hash(&bincode::serialize(&v1_1).unwrap()); + hasher.hash(&BubblegumChange::encode_key(k1_2)); + hasher.hash(&bincode::serialize(&v1_2).unwrap()); + hasher.result().to_bytes() + }; + + let epoch_key = BubblegumEpochKey::new(tree1, 1); + let epoch_val = storage + .bubblegum_epochs + .get(epoch_key.clone()) + .unwrap() + .unwrap(); + + assert_eq!(Checksum::Value(expected_epoch_checksum), epoch_val.checksum); + + let expected_grand_epoch_checksum = { + let mut hasher = Hasher::default(); + hasher.hash(&BubblegumEpoch::encode_key(epoch_key)); + hasher.hash(&bincode::serialize(&epoch_val).unwrap()); + hasher.result().to_bytes() + }; + + let grand_epoch_key = BubblegumGrandEpochKey::new(tree1, 0); + let grand_epoch_val = storage + .bubblegum_grand_epochs + .get(grand_epoch_key) + .unwrap() + .unwrap(); + + assert_eq!( + Checksum::Value(expected_grand_epoch_checksum), + grand_epoch_val.checksum + ); + } +} diff --git a/rocks-db/Cargo.toml b/rocks-db/Cargo.toml index 0619089a..63e9ff77 100644 --- a/rocks-db/Cargo.toml +++ b/rocks-db/Cargo.toml @@ -41,6 +41,7 @@ usecase = { path = "../usecase" } tempfile = { workspace = true } bubblegum-batch-sdk = { git = "https://github.com/metaplex-foundation/bubblegum-batch-sdk.git", rev = "0d529f5" } num-traits = { workspace = true } +lazy_static = { workspace = true } [dev-dependencies] rand = { workspace = true } diff --git a/rocks-db/src/lib.rs b/rocks-db/src/lib.rs index aeda7924..5c4f215e 100644 --- a/rocks-db/src/lib.rs +++ b/rocks-db/src/lib.rs @@ -4,6 +4,10 @@ use inflector::Inflector; use leaf_signatures::LeafSignature; use std::sync::atomic::AtomicU64; use std::{marker::PhantomData, sync::Arc}; +use storage_consistency::{ + AccountNftBucket, AccountNftChange, AccountNftGrandBucket, BubblegumChange, BubblegumEpoch, + BubblegumGrandEpoch, +}; use asset::{ AssetAuthorityDeprecated, AssetCollectionDeprecated, AssetOwnerDeprecated, MetadataMintMap, @@ -73,6 +77,7 @@ pub mod schedule; pub mod sequence_consistent; pub mod signature_client; pub mod slots_dumper; +pub mod storage_consistency; pub mod storage_traits; pub mod token_accounts; pub mod token_prices; @@ -132,6 +137,12 @@ pub struct Storage { pub inscription_data: Column, pub leaf_signature: Column, pub spl_mints: Column, + pub bubblegum_changes: Column, + pub bubblegum_epochs: Column, + pub bubblegum_grand_epochs: Column, + pub acc_nft_changes: Column, + pub acc_nft_epochs: Column, + pub acc_nft_grand_epochs: Column, assets_update_last_seq: AtomicU64, join_set: Arc>>>, red_metrics: Arc, @@ -185,6 +196,12 @@ impl Storage { let inscription_data = Self::column(db.clone(), red_metrics.clone()); let leaf_signature = Self::column(db.clone(), red_metrics.clone()); let spl_mints = Self::column(db.clone(), red_metrics.clone()); + let bubblegum_changes = Self::column(db.clone(), red_metrics.clone()); + let bubblegum_epochs = Self::column(db.clone(), red_metrics.clone()); + let bubblegum_grand_epochs = Self::column(db.clone(), red_metrics.clone()); + let acc_nft_changes = Self::column(db.clone(), red_metrics.clone()); + let acc_nft_epochs = Self::column(db.clone(), red_metrics.clone()); + let acc_nft_grand_epochs = Self::column(db.clone(), red_metrics.clone()); Self { asset_static_data, @@ -231,6 +248,12 @@ impl Storage { inscription_data, leaf_signature, spl_mints, + bubblegum_changes, + bubblegum_epochs, + bubblegum_grand_epochs, + acc_nft_changes, + acc_nft_epochs, + acc_nft_grand_epochs, } } @@ -310,6 +333,12 @@ impl Storage { Self::new_cf_descriptor::(migration_state), Self::new_cf_descriptor::(migration_state), Self::new_cf_descriptor::(migration_state), + Self::new_cf_descriptor::(migration_state), + Self::new_cf_descriptor::(migration_state), + Self::new_cf_descriptor::(migration_state), + Self::new_cf_descriptor::(migration_state), + Self::new_cf_descriptor::(migration_state), + Self::new_cf_descriptor::(migration_state), ] } @@ -646,6 +675,18 @@ impl Storage { token_accounts::merge_mints, ); } + BubblegumEpoch::NAME => { + cf_options.set_merge_operator_associative( + "merge_fn_bubblegum_epoch", + storage_consistency::merge_bubblgum_epoch_checksum, + ); + } + BubblegumGrandEpoch::NAME => { + cf_options.set_merge_operator_associative( + "merge_fn_bubblegum_grand_epoch", + storage_consistency::merge_bubblgum_grand_epoch_checksum, + ); + } _ => {} } cf_options diff --git a/rocks-db/src/storage_consistency.rs b/rocks-db/src/storage_consistency.rs new file mode 100644 index 00000000..b68a4e4c --- /dev/null +++ b/rocks-db/src/storage_consistency.rs @@ -0,0 +1,492 @@ +//! This module contains core functionality for storing and manipulating +//! so called p2p consistency checking data - checksums for bubblegum +//! and account NFT updates. +//! +//! The main idea is that we split slots "timeline" into so called epochs, +//! (each epoch is 10 000 slots) and calculate checksum for each epoch. +//! 10 epochs shape a grand epoch. +//! +//! Later aura instances can exchange these checksums one with each other +//! to identify whether an instance has missed a portion of changes. +//! +//! Bubblgum update: (tree, slot, seq) => (signature) +//! V +//! Bubblgum epoch: (tree, epoch) => (checksum) +//! V +//! Bubblegum grand epoch: (tree, grand epoch) => (checksum) +use rocksdb::MergeOperands; +use serde::{Deserialize, Serialize}; +use solana_sdk::pubkey::Pubkey; + +use crate::{column::TypedColumn, transaction::TreeUpdate, Storage}; + +use std::{cell::Cell, collections::HashSet}; + +/// Holds the number for an epoch that is happening at the moment +/// It should be super presize, i.e. it is OK (and even preferable) +/// to have a small lag between real epoch change (current solana slot) +/// and update of this variable. +const CURRENT_ESTIMATED_EPOCH: Cell = Cell::new(0); + +pub fn current_estimated_epoch() -> u32 { + CURRENT_ESTIMATED_EPOCH.get() +} + +pub fn update_estimated_epoch(new_value: u32) { + CURRENT_ESTIMATED_EPOCH.set(new_value); +} + +pub fn epoch_of_slot(slot: u64) -> u32 { + (slot / 10_000) as u32 +} + +pub fn grand_epoch_of_slot(slot: u64) -> u16 { + (slot / 100_000) as u16 +} + +pub fn grand_epoch_of_epoch(slot: u32) -> u16 { + (slot / 10) as u16 +} + +pub fn first_slot_in_epoch(epoch: u32) -> u64 { + epoch as u64 * 10_000 +} + +pub fn first_epoch_in_grand_epoch(grand_epoch: u16) -> u32 { + grand_epoch as u32 * 10 +} + +pub fn slots_to_next_epoch(slot: u64) -> u64 { + slot % 100_000 +} + +pub const BUBBLEGUM_EPOCH_INVALIDATED: BubblegumEpoch = BubblegumEpoch { + checksum: Checksum::Invalidated, +}; + +pub const BUBBLEGUM_EPOCH_CALCULATING: BubblegumEpoch = BubblegumEpoch { + checksum: Checksum::Calculating, +}; + +pub const BUBBLEGUM_GRAND_EPOCH_INVALIDATED: BubblegumGrandEpoch = BubblegumGrandEpoch { + checksum: Checksum::Invalidated, +}; + +pub const BUBBLEGUM_GRAND_EPOCH_CALCULATING: BubblegumGrandEpoch = BubblegumGrandEpoch { + checksum: Checksum::Calculating, +}; + +/// Checksum value for bubblegum epoch/account bucket. +/// Since the arrival of Solana data is asynchronous and has no strict order guarantees, +/// we can easily fall into a situation when we are in the process of calculation +/// of a checksum for an epoch, and a new update came befor the checksum has been written. +/// ```norun +/// +/// epoch end a change for previous epoch arrived +/// | | +/// V V +/// ---------------------------------------------------> timeline +/// ^ \____________ ____________/ ^ +/// | \/ | +/// read calculating write +/// all checksum epoch +/// changes checksum +/// ``` +/// To prevent such inconsistency of a checksum, roght before the calulating, +/// we mark the epoch checksum to be calculated is "Calculating", +/// and after the checksum is calculated, we write this value only in case +/// if the previous value is still in "Calculated" state. +/// +/// At the same time, when the Bubblegum updated processor receives +/// a new update with slot that epoch is from the previous epoch perioud, +/// it not only writed the bubblegum change, but also updated +/// corresponding epoch state to "Invalidated", which prevents +/// the checksum that might be in the process of calculation +/// to be written. +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub enum Checksum { + Invalidated, + Calculating, + Value([u8; 32]), +} + +/// Key for storing a change detected for bubblegum contract. +/// The value is supposed to be `solana_sdk::signature::Signature`` +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct BubblegumChangeKey { + pub epoch: u32, + pub tree_pubkey: Pubkey, + pub slot: u64, + pub seq: u64, +} + +impl BubblegumChangeKey { + pub fn new(tree_pubkey: Pubkey, slot: u64, seq: u64) -> BubblegumChangeKey { + BubblegumChangeKey { + epoch: epoch_of_slot(slot), + tree_pubkey, + slot, + seq, + } + } + pub fn epoch_start_key(epoch: u32) -> BubblegumChangeKey { + BubblegumChangeKey { + epoch, + tree_pubkey: Pubkey::from([0u8; 32]), + slot: first_slot_in_epoch(epoch), + seq: 0, + } + } + pub fn tree_epoch_start_key(tree_pubkey: Pubkey, epoch: u32) -> BubblegumChangeKey { + BubblegumChangeKey { + epoch, + tree_pubkey: tree_pubkey, + slot: first_slot_in_epoch(epoch), + seq: 0, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct BubblegumChange { + /// Original signature can be restored as + /// `solana_sdk::signature::Signature::from_str(...)` + pub signature: String, +} + +impl TypedColumn for BubblegumChange { + type KeyType = BubblegumChangeKey; + type ValueType = Self; + const NAME: &'static str = "BUBBLEGUM_CHANGES"; + + fn encode_key(key: Self::KeyType) -> Vec { + // fields are incoded in the order they are defined + bincode::serialize(&key).unwrap() + } + + fn decode_key(bytes: Vec) -> crate::Result { + let key = bincode::deserialize(&bytes)?; + Ok(key) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct BubblegumEpochKey { + pub tree_pubkey: Pubkey, + pub epoch_num: u32, +} + +impl BubblegumEpochKey { + pub fn new(tree_pubkey: Pubkey, epoch_num: u32) -> BubblegumEpochKey { + BubblegumEpochKey { + tree_pubkey, + epoch_num, + } + } + pub fn grand_epoch_start_key(grand_epoch: u16) -> BubblegumEpochKey { + BubblegumEpochKey { + tree_pubkey: Pubkey::from([0u8; 32]), + epoch_num: first_epoch_in_grand_epoch(grand_epoch), + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct BubblegumEpoch { + pub checksum: Checksum, +} + +impl From<[u8; 32]> for BubblegumEpoch { + fn from(value: [u8; 32]) -> Self { + BubblegumEpoch { + checksum: Checksum::Value(value), + } + } +} + +impl TypedColumn for BubblegumEpoch { + type KeyType = BubblegumEpochKey; + type ValueType = Self; + const NAME: &'static str = "BUBBLEGUM_EPOCHS"; + + fn encode_key(key: Self::KeyType) -> Vec { + bincode::serialize(&key).unwrap() + } + + fn decode_key(bytes: Vec) -> crate::Result { + let key = bincode::deserialize(&bytes)?; + Ok(key) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct BubblegumGrandEpochKey { + pub tree_pubkey: Pubkey, + pub grand_epoch_num: u16, +} + +impl BubblegumGrandEpochKey { + pub fn new(tree_pubkey: Pubkey, grand_epoch_num: u16) -> BubblegumGrandEpochKey { + BubblegumGrandEpochKey { + tree_pubkey, + grand_epoch_num, + } + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct BubblegumGrandEpoch { + pub checksum: Checksum, +} + +impl From<[u8; 32]> for BubblegumGrandEpoch { + fn from(value: [u8; 32]) -> Self { + BubblegumGrandEpoch { + checksum: Checksum::Value(value), + } + } +} + +impl TypedColumn for BubblegumGrandEpoch { + type KeyType = BubblegumGrandEpochKey; + type ValueType = Self; + const NAME: &'static str = "BUBBLEGUM_GRAND_EPOCHS"; + + fn encode_key(key: Self::KeyType) -> Vec { + bincode::serialize(&key).unwrap() + } + + fn decode_key(bytes: Vec) -> crate::Result { + let key = bincode::deserialize(&bytes)?; + Ok(key) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct AccountNftChangeKey { + pub slot_number: u64, + pub account_pubkey: Pubkey, + write_version: u64, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct AccountNftChange { + pub checksum: Vec, +} + +impl TypedColumn for AccountNftChange { + type KeyType = AccountNftChangeKey; + type ValueType = Self; + const NAME: &'static str = "ACC_NFT_CHANGES"; + + fn encode_key(key: Self::KeyType) -> Vec { + bincode::serialize(&key).unwrap() + } + + fn decode_key(bytes: Vec) -> crate::Result { + let key = bincode::deserialize(&bytes)?; + Ok(key) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct AccountNftBucketKey { + pub epoch_num: u64, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct AccountNftBucket { + pub checksum: Vec, +} + +impl TypedColumn for AccountNftBucket { + type KeyType = AccountNftBucketKey; + type ValueType = Self; + const NAME: &'static str = "ACC_NFT_BUCKETS"; + + fn encode_key(key: Self::KeyType) -> Vec { + bincode::serialize(&key).unwrap() + } + + fn decode_key(bytes: Vec) -> crate::Result { + let key = bincode::deserialize(&bytes)?; + Ok(key) + } +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct AccountNftGrandBucketKey { + pub epoch_num: u64, + pub checksum: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] +pub struct AccountNftGrandBucket { + pub epoch_num: u64, + pub checksum: Vec, +} + +impl TypedColumn for AccountNftGrandBucket { + type KeyType = AccountNftGrandBucketKey; + type ValueType = Self; + const NAME: &'static str = "ACC_NFT_GRAND_EPOCHS"; + + fn encode_key(key: Self::KeyType) -> Vec { + bincode::serialize(&key).unwrap() + } + + fn decode_key(bytes: Vec) -> crate::Result { + let key = bincode::deserialize(&bytes)?; + Ok(key) + } +} + +impl Storage { + /// Adds bubblegum change record to the bubblegum changes column family. + /// Functionality for triggering checksum calculation/re-calculation is triggered separately, + /// in ingester module. + pub fn track_tree_change_with_batch( + &self, + batch: &mut rocksdb::WriteBatch, + tree_update: &TreeUpdate, + ) -> crate::Result<()> { + let key = + BubblegumChangeKey::new(tree_update.tree.clone(), tree_update.slot, tree_update.seq); + let value = BubblegumChange { + signature: tree_update.tx.clone(), + }; + let _ = self.bubblegum_changes.put_with_batch(batch, key, &value); + + if epoch_of_slot(tree_update.slot) < current_estimated_epoch() { + // We invalidate epoch checksum here, but trigger checksum recalculation in another place. + // Possibly somthing might happen after that checksum is invalidate, and re-calculation + // won't start. + // It is acceptable, since it is better to have clearly invalidated checksum, + // than cehcksum that doesn't reflect the current state. + self.invalidate_bubblegum_epoch_with_batch( + batch, + tree_update.tree, + epoch_of_slot(tree_update.slot), + ); + } + + Ok(()) + } + + fn invalidate_bubblegum_epoch_with_batch( + &self, + batch: &mut rocksdb::WriteBatch, + tree: Pubkey, + epoch: u32, + ) { + let epock_key = BubblegumEpochKey { + tree_pubkey: tree, + epoch_num: epoch, + }; + let _ = + self.bubblegum_epochs + .put_with_batch(batch, epock_key, &BUBBLEGUM_EPOCH_INVALIDATED); + + if grand_epoch_of_epoch(epoch) < grand_epoch_of_epoch(current_estimated_epoch()) { + let grand_epock_key = BubblegumGrandEpochKey { + tree_pubkey: tree, + grand_epoch_num: grand_epoch_of_epoch(epoch), + }; + let _ = self.bubblegum_grand_epochs.put_with_batch( + batch, + grand_epock_key, + &BUBBLEGUM_GRAND_EPOCH_INVALIDATED, + ); + } + } +} + +#[async_trait::async_trait] +pub trait DataConsistencyStorage { + async fn drop_forked_bubblegum_changes(&self, chagens: &[BubblegumChangeKey]); +} + +#[async_trait::async_trait] +impl DataConsistencyStorage for Storage { + async fn drop_forked_bubblegum_changes(&self, chagens: &[BubblegumChangeKey]) { + let _ = self.bubblegum_changes.delete_batch(chagens.to_vec()).await; + + let mut epochs_to_invalidate = HashSet::new(); + for change in chagens { + epochs_to_invalidate.insert(BubblegumEpochKey { + tree_pubkey: change.tree_pubkey, + epoch_num: epoch_of_slot(change.slot), + }); + } + + let mut grand_epochs_to_invalidate = HashSet::new(); + for epoch_to_invalidate in epochs_to_invalidate { + let _ = self.bubblegum_epochs.put( + epoch_to_invalidate.clone(), + BUBBLEGUM_EPOCH_INVALIDATED.clone(), + ); + grand_epochs_to_invalidate.insert(BubblegumGrandEpochKey { + tree_pubkey: epoch_to_invalidate.tree_pubkey, + grand_epoch_num: grand_epoch_of_epoch(epoch_to_invalidate.epoch_num), + }); + } + + for grand_epoch_to_invalidate in grand_epochs_to_invalidate { + let _ = self.bubblegum_grand_epochs.put( + grand_epoch_to_invalidate, + BUBBLEGUM_GRAND_EPOCH_INVALIDATED.clone(), + ); + } + } +} + +// TODO: Replace with LazyLock after rustc update. +lazy_static::lazy_static! { + pub static ref BUBBLEGUM_EPOCH_CALCULATING_BYTES: Vec = bincode::serialize(&BUBBLEGUM_EPOCH_CALCULATING).unwrap(); + pub static ref BUBBLEGUM_GRAND_EPOCH_CALCULATING_BYTES: Vec = bincode::serialize(&BUBBLEGUM_GRAND_EPOCH_CALCULATING).unwrap(); +} + +/// This merge should be used only for setting calculated checksum. +/// The thing is that while we calculate checksum for tree signatures in a given slot, +/// it is possible that in parallel we receive another update for this tree in this epoch. +/// To not miss this fact we calculate tree epoch checksum in following way: +/// 1) Set tree epoch as Calculating +/// 2) Calculate checksum +/// 3) Update tree epoch with calculated checksum, only if previous value is Calculating +/// This works in conjunction with bubblegum_updates_processor that sets tree epoch value +/// to Invalidated after each tree update. +/// That's why checksum calculator is able to specify checksum only, if no updates have been +/// received during the calculating (because otherwise the status will be Invalidated, not Calculating). +pub(crate) fn merge_bubblgum_epoch_checksum( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + if let Some(v) = existing_val { + if v == BUBBLEGUM_EPOCH_CALCULATING_BYTES.as_slice() { + if let Some(op) = operands.into_iter().next() { + return Some(op.to_vec()); + } + } + Some(v.to_vec()) + } else { + None + } +} + +pub(crate) fn merge_bubblgum_grand_epoch_checksum( + _new_key: &[u8], + existing_val: Option<&[u8]>, + operands: &MergeOperands, +) -> Option> { + if let Some(v) = existing_val { + if v == BUBBLEGUM_GRAND_EPOCH_CALCULATING_BYTES.as_slice() { + if let Some(op) = operands.into_iter().next() { + return Some(op.to_vec()); + } + } + Some(v.to_vec()) + } else { + None + } +} diff --git a/rocks-db/src/transaction.rs b/rocks-db/src/transaction.rs index 6f98ac42..9fe68050 100644 --- a/rocks-db/src/transaction.rs +++ b/rocks-db/src/transaction.rs @@ -84,6 +84,8 @@ pub struct AssetUpdate { pub pk: Pubkey, pub details: T, } + +/// Don't be confused by the name, this type is for Bubblegum instructionА only #[derive(Clone, Default)] pub struct InstructionResult { pub update: Option, diff --git a/rocks-db/src/transaction_client.rs b/rocks-db/src/transaction_client.rs index 70fbafea..0817421f 100644 --- a/rocks-db/src/transaction_client.rs +++ b/rocks-db/src/transaction_client.rs @@ -161,6 +161,7 @@ impl Storage { self.save_tree_with_batch(batch, tree_update); self.save_asset_signature_with_batch(batch, tree_update); self.save_leaf_signature_with_batch(batch, tree_update)?; + self.track_tree_change_with_batch(batch, tree_update)?; // for p2p consistency } Ok(())