From 23cb90a75237b83043a427af1cce3582b55ae101 Mon Sep 17 00:00:00 2001 From: snorochevskiy Date: Thu, 31 Oct 2024 15:36:25 +0200 Subject: [PATCH] Adding tracking of nft changes --- entities/src/models.rs | 18 + nft_ingester/benches/ingester_benchmark.rs | 1 + nft_ingester/src/accounts_processor.rs | 16 +- nft_ingester/src/bin/ingester/main.rs | 19 +- nft_ingester/src/bin/raw_backfiller/main.rs | 16 + .../src/bubblegum_updates_processor.rs | 21 +- nft_ingester/src/consistency_calculator.rs | 499 +++++++++++++++--- nft_ingester/src/fork_cleaner.rs | 15 + nft_ingester/src/lib.rs | 1 - nft_ingester/src/solana_slot_service.rs | 50 -- nft_ingester/tests/batch_mint_test.rs | 1 + nft_ingester/tests/bubblegum_tests.rs | 2 + nft_ingester/tests/clean_forks_test.rs | 2 + .../tests/consistency_calculator_test.rs | 34 +- nft_ingester/tests/decompress.rs | 1 + rocks-db/src/storage_consistency.rs | 46 +- 16 files changed, 581 insertions(+), 161 deletions(-) delete mode 100644 nft_ingester/src/solana_slot_service.rs diff --git a/entities/src/models.rs b/entities/src/models.rs index 91e2a1b2..8d1fb17b 100644 --- a/entities/src/models.rs +++ b/entities/src/models.rs @@ -597,6 +597,24 @@ pub struct UnprocessedAccountMessage { pub id: String, } +impl UnprocessedAccountMessage { + pub fn solana_change_info(&self) -> (Pubkey, u64, u64) { + let (slot, write_version) = match &self.account { + UnprocessedAccount::MetadataInfo(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::Token(v) => (v.slot_updated as u64, v.write_version), + UnprocessedAccount::Mint(v) => (v.slot_updated as u64, v.write_version), + UnprocessedAccount::Edition(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::BurnMetadata(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::BurnMplCore(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::MplCore(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::Inscription(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::InscriptionData(v) => (v.slot_updated, v.write_version), + UnprocessedAccount::MplCoreFee(v) => (v.slot_updated, v.write_version), + }; + (self.key, slot, write_version) + } +} + pub struct BufferedTxWithID { pub tx: BufferedTransaction, pub id: String, diff --git a/nft_ingester/benches/ingester_benchmark.rs b/nft_ingester/benches/ingester_benchmark.rs index a6a06f0a..f3477ab0 100644 --- a/nft_ingester/benches/ingester_benchmark.rs +++ b/nft_ingester/benches/ingester_benchmark.rs @@ -35,6 +35,7 @@ async fn bench_ingest( rocks_dest.clone(), Arc::new(IngesterMetricsConfig::new()), buffer.json_tasks.clone(), + None, )); let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( diff --git a/nft_ingester/src/accounts_processor.rs b/nft_ingester/src/accounts_processor.rs index c2594032..94330619 100644 --- a/nft_ingester/src/accounts_processor.rs +++ b/nft_ingester/src/accounts_processor.rs @@ -1,3 +1,4 @@ +use crate::consistency_calculator::NftChangesTracker; use crate::error::IngesterError; use crate::inscriptions_processor::InscriptionsProcessor; use crate::mpl_core_fee_indexing_processor::MplCoreFeeProcessor; @@ -38,6 +39,7 @@ pub async fn run_accounts_processor, postgre_client: Arc, rpc_client: Arc, + nft_changes_tracker: Arc, join_set: Arc>>>, ) { mutexed_tasks.lock().await.spawn(async move { @@ -54,7 +56,7 @@ pub async fn run_accounts_processor AccountsProcessor { rx: Receiver<()>, storage: Arc, accounts_batch_size: usize, + nft_changes_tracker: Arc, ) { let mut batch_storage = BatchSaveStorage::new(storage, accounts_batch_size, self.metrics.clone()); @@ -133,7 +136,7 @@ impl AccountsProcessor { continue; } }; - self.process_account(&mut batch_storage, unprocessed_accounts, &mut core_fees, &mut ack_ids, &mut interval, &mut batch_fill_instant).await; + self.process_account(&mut batch_storage, unprocessed_accounts, &mut core_fees, &mut ack_ids, &mut interval, &mut batch_fill_instant, &nft_changes_tracker).await; }, _ = interval.tick() => { self.flush(&mut batch_storage, &mut ack_ids, &mut interval, &mut batch_fill_instant); @@ -160,6 +163,7 @@ impl AccountsProcessor { ack_ids: &mut Vec, interval: &mut tokio::time::Interval, batch_fill_instant: &mut Instant, + nft_changes_tracker: &NftChangesTracker, ) { for unprocessed_account in unprocessed_accounts { let processing_result = match &unprocessed_account.account { @@ -227,7 +231,13 @@ impl AccountsProcessor { error!("Processing account {}: {}", unprocessed_account.key, err); continue; } - // TODO: write account change + { + let (account_pubkey, slot, write_version) = + unprocessed_account.solana_change_info(); + nft_changes_tracker + .track_account_change(account_pubkey, slot, write_version) + .await; + } self.metrics .inc_accounts(unprocessed_account.account.into()); ack_ids.push(unprocessed_account.id); diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index 803642c2..b3c1612e 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -17,7 +17,7 @@ use plerkle_messenger::ConsumptionType; use pprof::ProfilerGuardBuilder; use rocks_db::bubblegum_slots::{BubblegumSlotGetter, IngestableSlotGetter}; use solana_client::nonblocking::rpc_client::RpcClient; -use tokio::sync::{broadcast, Mutex}; +use tokio::sync::{broadcast, mpsc, Mutex}; use tokio::task::JoinSet; use tokio::time::sleep as tokio_sleep; use tracing::{error, info, warn}; @@ -42,6 +42,9 @@ use nft_ingester::buffer::{debug_buffer, Buffer}; use nft_ingester::config::{ setup_config, ApiConfig, BackfillerConfig, BackfillerMode, IngesterConfig, MessageSource, INGESTER_CONFIG_PREFIX, }; +use nft_ingester::consistency_calculator; +use nft_ingester::consistency_calculator::NftChangesTracker; +use nft_ingester::consistency_calculator::NTF_CHANGES_NOTIFICATION_QUEUE_SIZE; use nft_ingester::fork_cleaner::{run_fork_cleaner, ForkCleaner}; use nft_ingester::gapfiller::{process_asset_details_stream_wrapper, run_sequence_consistent_gapfiller}; use nft_ingester::index_syncronizer::Synchronizer; @@ -185,6 +188,14 @@ pub async fn main() -> Result<(), IngesterError> { } let rpc_client = Arc::new(RpcClient::new(config.rpc_host.clone())); + let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE); + let changes_tracker = Arc::new(NftChangesTracker::new(primary_rocks_storage.clone(), nft_change_snd.clone())); + consistency_calculator::run_bg_consistency_calculator( + nft_change_rcv, + primary_rocks_storage.clone(), + shutdown_rx.resubscribe(), + ); + for _ in 0..config.accounts_parsing_workers { match config.message_source { MessageSource::Redis => { @@ -206,6 +217,7 @@ pub async fn main() -> Result<(), IngesterError> { metrics_state.ingester_metrics.clone(), index_pg_storage.clone(), rpc_client.clone(), + changes_tracker.clone(), mutexed_tasks.clone(), ) .await; @@ -221,6 +233,7 @@ pub async fn main() -> Result<(), IngesterError> { metrics_state.ingester_metrics.clone(), index_pg_storage.clone(), rpc_client.clone(), + changes_tracker.clone(), mutexed_tasks.clone(), ) .await; @@ -240,6 +253,7 @@ pub async fn main() -> Result<(), IngesterError> { metrics_state.ingester_metrics.clone(), index_pg_storage.clone(), rpc_client.clone(), + changes_tracker.clone(), mutexed_tasks.clone(), ) .await; @@ -363,6 +377,7 @@ pub async fn main() -> Result<(), IngesterError> { primary_rocks_storage.clone(), metrics_state.ingester_metrics.clone(), buffer.json_tasks.clone(), + Some(changes_tracker.clone()), )); for _ in 0..config.transactions_parsing_workers { @@ -407,6 +422,7 @@ pub async fn main() -> Result<(), IngesterError> { primary_rocks_storage.clone(), metrics_state.ingester_metrics.clone(), buffer.json_tasks.clone(), + Some(changes_tracker.clone()), )); let tx_ingester = Arc::new(BackfillTransactionIngester::new(backfill_bubblegum_updates_processor.clone())); let backfiller_config = setup_config::(INGESTER_CONFIG_PREFIX); @@ -689,6 +705,7 @@ pub async fn main() -> Result<(), IngesterError> { primary_rocks_storage.clone(), primary_rocks_storage.clone(), primary_rocks_storage.clone(), + Some(changes_tracker.clone()), metrics_state.fork_cleaner_metrics.clone(), ); let rx = shutdown_rx.resubscribe(); diff --git a/nft_ingester/src/bin/raw_backfiller/main.rs b/nft_ingester/src/bin/raw_backfiller/main.rs index 1c1b3d26..297f1c6f 100644 --- a/nft_ingester/src/bin/raw_backfiller/main.rs +++ b/nft_ingester/src/bin/raw_backfiller/main.rs @@ -6,6 +6,9 @@ use nft_ingester::buffer::Buffer; use nft_ingester::config::{ self, init_logger, setup_config, BackfillerConfig, RawBackfillConfig, INGESTER_CONFIG_PREFIX, }; +use nft_ingester::consistency_calculator; +use nft_ingester::consistency_calculator::NftChangesTracker; +use nft_ingester::consistency_calculator::NTF_CHANGES_NOTIFICATION_QUEUE_SIZE; use nft_ingester::error::IngesterError; use nft_ingester::init::graceful_stop; use nft_ingester::transaction_ingester; @@ -19,6 +22,7 @@ use metrics_utils::{BackfillerMetricsConfig, IngesterMetricsConfig}; use rocks_db::bubblegum_slots::BubblegumSlotGetter; use rocks_db::migrator::MigrationState; use rocks_db::Storage; +use tokio::sync::mpsc; use tokio::sync::{broadcast, Mutex}; use tokio::task::JoinSet; @@ -139,6 +143,17 @@ pub async fn main() -> Result<(), IngesterError> { ); let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); + let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE); + let changes_tracker = Arc::new(NftChangesTracker::new( + rocks_storage.clone(), + nft_change_snd.clone(), + )); + consistency_calculator::run_bg_consistency_calculator( + nft_change_rcv, + rocks_storage.clone(), + shutdown_rx.resubscribe(), + ); + match backfiller_config.backfiller_mode { config::BackfillerMode::IngestDirectly => { todo!(); @@ -174,6 +189,7 @@ pub async fn main() -> Result<(), IngesterError> { rocks_storage.clone(), ingester_metrics.clone(), buffer.json_tasks.clone(), + Some(changes_tracker.clone()), )); let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( diff --git a/nft_ingester/src/bubblegum_updates_processor.rs b/nft_ingester/src/bubblegum_updates_processor.rs index 2061a536..a49b280b 100644 --- a/nft_ingester/src/bubblegum_updates_processor.rs +++ b/nft_ingester/src/bubblegum_updates_processor.rs @@ -1,3 +1,4 @@ +use crate::consistency_calculator::NftChangesTracker; use crate::error::IngesterError; use crate::flatbuffer_mapper::FlatbufferMapper; use crate::plerkle; @@ -60,6 +61,7 @@ pub struct BubblegumTxProcessor { pub transaction_parser: Arc, pub instruction_parser: Arc, pub rocks_client: Arc, + pub nft_change_tracker: Option>, pub json_tasks: Arc>>, pub metrics: Arc, @@ -70,11 +72,13 @@ impl BubblegumTxProcessor { rocks_client: Arc, metrics: Arc, json_tasks: Arc>>, + nft_change_tracker: Option>, ) -> Self { BubblegumTxProcessor { transaction_parser: Arc::new(FlatbufferMapper {}), instruction_parser: Arc::new(BubblegumParser {}), rocks_client, + nft_change_tracker, json_tasks, metrics, } @@ -1173,8 +1177,23 @@ impl BubblegumTxProcessor { Ok(()) } + /// Checks if the given instruction is a late instruction, belongs to a previous epoch, + /// and emits notification a notification, that will force the checksum calculator component + /// to recalculate epoch checksum. + /// + /// Note: this function only sends the notification, the saving of bubblgum changes + /// happens in [Storage::store_instruction_result_with_batch] by calling + /// [Storage::track_tree_change_with_batch] async fn calc_checksums_if_needed(&self, instructions: &[InstructionResult]) { - // TODO: if there was a change with a slot from the previous epoch, **emit** checksum recalc + if let Some(nft_change_tracker) = self.nft_change_tracker.as_ref() { + for ix in instructions { + if let Some(tree_update) = ix.tree_update.as_ref() { + nft_change_tracker + .watch_bubblegum_change(tree_update.tree, tree_update.slot) + .await; + } + } + } } } diff --git a/nft_ingester/src/consistency_calculator.rs b/nft_ingester/src/consistency_calculator.rs index 7622d89a..82ff19d9 100644 --- a/nft_ingester/src/consistency_calculator.rs +++ b/nft_ingester/src/consistency_calculator.rs @@ -1,54 +1,251 @@ +//! The module contains functionality for calculating transaction-based +//! and account-based NFT checsums that are used in peer-to-peer +//! Aura nodes communication to identify missing data on a node. + use rocks_db::{ column::TypedColumn, storage_consistency::{ - self, bucket_for_acc, grand_bucket_for_bucket, grand_epoch_of_epoch, AccountNft, - AccountNftBucket, AccountNftBucketKey, AccountNftChange, AccountNftChangeKey, + self, bucket_for_acc, epoch_of_slot, grand_bucket_for_bucket, grand_epoch_of_epoch, + AccountNft, AccountNftBucket, AccountNftBucketKey, AccountNftChange, AccountNftChangeKey, AccountNftGrandBucket, AccountNftGrandBucketKey, AccountNftKey, BubblegumChange, BubblegumChangeKey, BubblegumEpoch, BubblegumEpochKey, BubblegumGrandEpoch, BubblegumGrandEpochKey, ACC_BUCKET_INVALIDATE, ACC_GRAND_BUCKET_INVALIDATE, + BUBBLEGUM_GRAND_EPOCH_INVALIDATED, }, Storage, }; use solana_sdk::{hash::Hasher, pubkey::Pubkey}; -use std::{collections::HashSet, sync::Arc, time::Duration}; +use std::{ + collections::{BTreeSet, HashSet}, + sync::Arc, + time::Duration, +}; use storage_consistency::{BUBBLEGUM_EPOCH_CALCULATING, BUBBLEGUM_GRAND_EPOCH_CALCULATING}; -use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle}; +use tokio::sync::{ + mpsc::{Receiver, Sender}, + Mutex, +}; + +pub const NTF_CHANGES_NOTIFICATION_QUEUE_SIZE: usize = 1000; +/// Wait this amount of seconds for late data before starting to calculate the epoch +const EPOCH_CALC_LAG_SEC: u64 = 300; + +/// This message is used to send notifications abount changes from: +/// - bubblegum processor +/// - account processor +/// - fork cleaner +#[derive(Debug, PartialEq, Eq)] pub enum ConsistencyCalcMsg { - EpochChanged { epoch: u32 }, + StartingBackfilling, + FinishedBackfilling, + EpochChanged { new_epoch: u32 }, BubblegumUpdated { tree: Pubkey, slot: u64 }, AccUpdated { account: Pubkey, slot: u64 }, } +/// Component for convenient storing of account NFT changes, +/// and notifying checksum calculator when a whole epoch, +/// or an individual bubblegum three/account checksum should be calculated. +pub struct NftChangesTracker { + storage: Arc, + sender: Sender, +} + +impl NftChangesTracker { + pub fn new(storage: Arc, sender: Sender) -> NftChangesTracker { + NftChangesTracker { storage, sender } + } + + /// Persists given account NFT change into the sotrage, and, if the change is from the epoch + /// that is previous to the current epoch, then also notifies checksums calculator + /// about late data. + /// + /// ## Args: + /// * `account_pubkey` - Pubkey of the NFT account + /// * `slot` - the slot number that change is made in + /// * `write_version` - write version of the change + pub async fn track_account_change( + &self, + account_pubkey: Pubkey, + slot: u64, + write_version: u64, + ) { + let epoch = epoch_of_slot(slot); + let key = AccountNftChangeKey { + epoch, + account_pubkey, + slot, + write_version, + }; + let value = AccountNftChange {}; + + let last_slot = storage_consistency::track_slot_counter(slot); + let last_slot_epoch = epoch_of_slot(last_slot); + + if epoch < last_slot_epoch { + let bucket = bucket_for_acc(account_pubkey); + let grand_bucket = grand_bucket_for_bucket(bucket); + let _ = self + .storage + .acc_nft_grand_buckets + .put_async( + AccountNftGrandBucketKey::new(grand_bucket), + ACC_GRAND_BUCKET_INVALIDATE, + ) + .await; + let _ = self + .storage + .acc_nft_buckets + .put_async(AccountNftBucketKey::new(bucket), ACC_BUCKET_INVALIDATE) + .await; + } + + let _ = self.storage.acc_nft_changes.put_async(key, value).await; + + if epoch < last_slot_epoch { + let _ = self + .sender + .send(ConsistencyCalcMsg::AccUpdated { + account: account_pubkey, + slot, + }) + .await; + } else if epoch > last_slot_epoch && last_slot != 0 { + let _ = self + .sender + .send(ConsistencyCalcMsg::EpochChanged { new_epoch: epoch }) + .await; + } + } + + /// Checks bubble tree slot, and if the slot number is from an epoch previous to the current, + /// emits notification to the checksums calculator. + /// + /// In contrast to account notification tracking method, for bubblegum we don't + /// store tree change here, since it is stored inside of [rocks_db::transaction_client] + /// in scope of the same batch that persists Bubblegum tree change details. + pub async fn watch_bubblegum_change(&self, tree: Pubkey, slot: u64) { + let epoch = epoch_of_slot(slot); + let last_slot = storage_consistency::track_slot_counter(slot); + let last_slot_epoch = epoch_of_slot(last_slot); + if epoch < last_slot_epoch { + let _ = self + .sender + .send(ConsistencyCalcMsg::BubblegumUpdated { tree, slot }) + .await; + } else if epoch > last_slot_epoch && last_slot != 0 { + let _ = self + .sender + .send(ConsistencyCalcMsg::EpochChanged { new_epoch: epoch }) + .await; + } + } + + /// Iterates over bubblegum changes, and for each of them check if the change from an epoch + /// previous to the current. If the change is from the previous epoch, + /// it sends a notification for the checksums calculator. + /// This method is called from the fork cleaner. + pub async fn watch_remove_forked_bubblegum_changes(&self, keys: &[BubblegumChangeKey]) { + let last_slot = storage_consistency::last_tracked_slot(); + let last_slot_epoch = epoch_of_slot(last_slot); + for key in keys { + if key.epoch < last_slot_epoch { + let _ = self + .sender + .send(ConsistencyCalcMsg::BubblegumUpdated { + tree: key.tree_pubkey, + slot: key.slot, + }) + .await; + } + } + } +} + /// An entry point for checksums calculation component. /// Should be called from "main". -pub async fn run_bg_consistency_calculator( - mut rcv: UnboundedReceiver, +/// Accepts notifications about epoch change, or changes in specific bubblegum tree or account, +/// and schedules checksum calculation. +pub fn run_bg_consistency_calculator( + mut rcv: Receiver, storage: Arc, + mut shutdown_signal: tokio::sync::broadcast::Receiver<()>, ) { tokio::spawn(async move { - let mut _bubblegum_task = None; + let bbgm_tasks: Arc>> = Arc::new(Mutex::new(BTreeSet::new())); + let acc_tasks: Arc>> = Arc::new(Mutex::new(BTreeSet::new())); + + // Taks that calculates bubblegum checksums + let _bbgm_bg = { + let storage = storage.clone(); + let bbgm_tasks = bbgm_tasks.clone(); + tokio::spawn(async move { + process_bbgm_tasks(storage, bbgm_tasks).await; + }) + }; + // Taks that calculates account NFT checksums + let _acc_bg = { + let storage = storage.clone(); + let acc_tasks = acc_tasks.clone(); + tokio::spawn(async move { + process_acc_tasks(storage, acc_tasks).await; + }) + }; - // Don't look here too much for now, it is to be implemented loop { - match rcv.recv().await { + let calc_msg = tokio::select! { + msg = rcv.recv() => msg, + _ = shutdown_signal.recv() => { + tracing::info!("Received stop signal, stopping consistency calculator"); + break; + } + }; + + match calc_msg { Some(msg) => match msg { - ConsistencyCalcMsg::EpochChanged { epoch } => { - // TODO: check sequnce_consistent.rs before calculating the checksums - - // TODO: Scheduler epoch calc. Should calc immediately or wait, since more data can come? - let t = schedule_bublegum_calc( - Duration::from_secs(300), - storage.clone(), - epoch, - ); - _bubblegum_task = Some(t); + ConsistencyCalcMsg::EpochChanged { new_epoch } => { + let prev_epoch = new_epoch.saturating_sub(1); + { + // We don't wait for gaps filles (sequnce_consistent.rs) to process + // slots up to the last in the epoch, just will recalculate then if needed. + let mut guard = bbgm_tasks.lock().await; + guard.insert(BbgmTask::CalcEpoch(prev_epoch)); + } + { + let mut guard = acc_tasks.lock().await; + guard.insert(AccTask::CalcEpoch(prev_epoch)); + } + } + ConsistencyCalcMsg::BubblegumUpdated { tree, slot } => { + let mut guard = bbgm_tasks.lock().await; + guard.insert(BbgmTask::CalcTree(epoch_of_slot(slot), tree)); + } + ConsistencyCalcMsg::AccUpdated { account: _, slot } => { + // It's actually more reasonable to just process all late changes + let mut guard = acc_tasks.lock().await; + guard.insert(AccTask::CalcEpoch(epoch_of_slot(slot))); + } + ConsistencyCalcMsg::StartingBackfilling => { + { + let mut guard = bbgm_tasks.lock().await; + guard.insert(BbgmTask::Suspend); + } + { + let mut guard = acc_tasks.lock().await; + guard.insert(AccTask::Suspend); + } + } + ConsistencyCalcMsg::FinishedBackfilling => { + { + let mut guard = bbgm_tasks.lock().await; + guard.insert(BbgmTask::Resume); + } + { + let mut guard = acc_tasks.lock().await; + guard.insert(AccTask::Resume); + } } - ConsistencyCalcMsg::BubblegumUpdated { tree: _, slot: _ } => todo!(), - ConsistencyCalcMsg::AccUpdated { - account: _, - slot: _, - } => todo!(), }, None => break, } @@ -56,27 +253,100 @@ pub async fn run_bg_consistency_calculator( }); } -/// After a given lag, launches checksum calculation for the given epoch. -/// -/// ## Args: -/// * `start_lag` - time to wait before actual calculation. -/// Required because we might want to wait for late updates. -/// * `storage` - database -/// * `epoch` - the epoch the calculation should be done for -fn schedule_bublegum_calc( - start_lag: Duration, - storage: Arc, - epoch: u32, -) -> JoinHandle<()> { - tokio::spawn(async move { - tokio::time::sleep(start_lag).await; - calc_bubblegum_checksums(&storage, epoch).await; - }) +/// Fields order matters! +/// We want whole epochs to be calculates before individual tree epochs from late changes. +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +enum BbgmTask { + /// Suspend checksum calculation, e.g. before backfilling + Suspend, + /// Resume checksum calculation (backfilling is finished) + Resume, + /// Calculate checksums for all bubblegum trees in the given epoch + CalcEpoch(u32), + /// Calculate checksums only for the given bubblegum tree in the given epoch + CalcTree(u32, Pubkey), } -pub async fn calc_bubblegum_checksums(storage: &Storage, epoch: u32) { - calc_bubblegum_epoch(&storage, epoch).await; - calc_bubblegum_grand_epoch(&storage, grand_epoch_of_epoch(epoch)).await; +#[derive(Debug, PartialEq, Eq, PartialOrd, Ord)] +enum AccTask { + Suspend, + Resume, + CalcEpoch(u32), +} + +async fn process_bbgm_tasks(storage: Arc, tasks: Arc>>) { + let mut is_suspended = false; + loop { + if is_suspended { + let guard = tasks.lock().await; + if guard + .first() + .map(|t| *t != BbgmTask::Resume) + .unwrap_or(true) + { + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } + } + let maybe_task = { + let mut guard = tasks.lock().await; + guard.pop_first() + }; + match maybe_task { + Some(BbgmTask::CalcEpoch(epoch)) => { + tokio::time::sleep(Duration::from_secs(EPOCH_CALC_LAG_SEC)).await; + tracing::info!("Calculating Bubblegum ckecksum epoch: {epoch}"); + calc_bubblegum_checksums(&storage, epoch, None).await; + tracing::info!("Finished calculating Bubblegum ckecksum epoch: {epoch}"); + } + Some(BbgmTask::CalcTree(epoch, tree)) => { + calc_bubblegum_checksums(&storage, epoch, Some(tree)).await + } + Some(BbgmTask::Suspend) => is_suspended = true, + Some(BbgmTask::Resume) => is_suspended = false, + None => tokio::time::sleep(Duration::from_secs(10)).await, + }; + } +} + +async fn process_acc_tasks(storage: Arc, tasks: Arc>>) { + let mut is_suspended = false; + loop { + if is_suspended { + let guard = tasks.lock().await; + if guard.first().map(|t| *t != AccTask::Resume).unwrap_or(true) { + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } + } + let maybe_task = { + let mut guard = tasks.lock().await; + guard.pop_first() + }; + match maybe_task { + Some(AccTask::CalcEpoch(epoch)) => calc_acc_nft_checksums(&storage, epoch).await, + Some(AccTask::Suspend) => is_suspended = true, + Some(AccTask::Resume) => is_suspended = false, + None => tokio::time::sleep(Duration::from_secs(10)).await, + }; + } +} + +/// Bubblegum checksums calculation start point. +/// Iterates over all the bubblegum trees changes in the given epoch, and calculates epochs +/// and grand epochs checksums. +pub async fn calc_bubblegum_checksums(storage: &Storage, epoch: u32, only_tree: Option) { + // For now let's just ignore trees that are update in the process of calculation, + // anywhay we'll have a separate notification for each of tree late update. + let trees_updated_in_the_process = calc_bubblegum_epoch(storage, epoch, only_tree).await; + let invalidated_grand_epoch_trees = + calc_bubblegum_grand_epoch(storage, grand_epoch_of_epoch(epoch), only_tree).await; + if only_tree.is_none() { + tracing::info!( + "Calculated bubblegum epoch {epoch}. {} epoch and {} grand epoch trees were updated in the process", + trees_updated_in_the_process.len(), invalidated_grand_epoch_trees.len(), + ); + } } /// Calculates and stores bubblegum epoch checksums for bubblegum updates @@ -85,14 +355,20 @@ pub async fn calc_bubblegum_checksums(storage: &Storage, epoch: u32) { /// ## Args: /// * `storage` - database /// * `target_epoch` - the number of an epoch the checksum should be calculated for -async fn calc_bubblegum_epoch(storage: &Storage, target_epoch: u32) { - // iterate over changes and calculate checksum per tree. - +async fn calc_bubblegum_epoch( + storage: &Storage, + target_epoch: u32, + only_tree: Option, +) -> Vec { + let mut to_recalc = Vec::new(); let mut current_tree: Option = None; - let mut it = storage - .bubblegum_changes - .iter(BubblegumChangeKey::epoch_start_key(target_epoch)); + let start_key = if let Some(tree) = only_tree { + BubblegumChangeKey::tree_epoch_start_key(tree, target_epoch) + } else { + BubblegumChangeKey::epoch_start_key(target_epoch) + }; + let mut it = storage.bubblegum_changes.iter(start_key); let mut hasher = Hasher::default(); while let Some(Ok((k, v))) = it.next() { @@ -102,6 +378,12 @@ async fn calc_bubblegum_epoch(storage: &Storage, target_epoch: u32) { if change_key.epoch > target_epoch { break; } + if only_tree + .map(|t| t != change_key.tree_pubkey) + .unwrap_or(false) + { + break; + } if current_tree != Some(change_key.tree_pubkey) { if current_tree.is_some() { // write checksum for previous tree @@ -116,14 +398,13 @@ async fn calc_bubblegum_epoch(storage: &Storage, target_epoch: u32) { if let Ok(Some(storage_consistency::BUBBLEGUM_EPOCH_INVALIDATED)) = storage.bubblegum_epochs.get_async(epoch_key).await { - // TODO: Means more changes for the tree have come while the epoch chechsum calculation was in process. - // How to handle? + to_recalc.push(current_tree.unwrap()); } } current_tree = Some(change_key.tree_pubkey); let new_epoch_key = BubblegumEpochKey::new(current_tree.unwrap(), target_epoch); - storage + let _ = storage .bubblegum_epochs .put_async(new_epoch_key, BUBBLEGUM_EPOCH_CALCULATING) .await; @@ -140,16 +421,35 @@ async fn calc_bubblegum_epoch(storage: &Storage, target_epoch: u32) { epoch_num: target_epoch, }; let epoch_val = BubblegumEpoch::from(hasher.result().to_bytes()); - let _ = storage.bubblegum_epochs.merge(epoch_key, epoch_val).await; + let _ = storage + .bubblegum_epochs + .merge(epoch_key.clone(), epoch_val) + .await; + if let Ok(Some(storage_consistency::BUBBLEGUM_EPOCH_INVALIDATED)) = + storage.bubblegum_epochs.get_async(epoch_key).await + { + to_recalc.push(current_tree); + } } + + to_recalc } -async fn calc_bubblegum_grand_epoch(storage: &Storage, target_grand_epoch: u16) { +async fn calc_bubblegum_grand_epoch( + storage: &Storage, + target_grand_epoch: u16, + only_tree: Option, +) -> Vec { + let mut to_recalc = Vec::new(); let mut current_tree: Option = None; + let mut contains_invalidated_epoch = false; - let mut it = storage - .bubblegum_epochs - .iter(BubblegumEpochKey::grand_epoch_start_key(target_grand_epoch)); + let start_key = if let Some(tree) = only_tree { + BubblegumEpochKey::tree_grand_epoch_start_key(tree, target_grand_epoch) + } else { + BubblegumEpochKey::grand_epoch_start_key(target_grand_epoch) + }; + let mut it = storage.bubblegum_epochs.iter(start_key); let mut hasher = Hasher::default(); while let Some(Ok((k, v))) = it.next() { @@ -160,26 +460,47 @@ async fn calc_bubblegum_grand_epoch(storage: &Storage, target_grand_epoch: u16) if element_grand_epoch > target_grand_epoch { break; } + if only_tree + .map(|t| t != epoch_key.tree_pubkey) + .unwrap_or(false) + { + break; + } + if v.as_ref() == storage_consistency::BUBBLEGUM_EPOCH_INVALIDATED_BYTES.as_slice() { + contains_invalidated_epoch = true; + let new_grand_epoch_key = + BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch); + let _ = storage + .bubblegum_grand_epochs + .put_async(new_grand_epoch_key, BUBBLEGUM_GRAND_EPOCH_INVALIDATED) + .await; + } if current_tree != Some(epoch_key.tree_pubkey) { if current_tree.is_some() { - // write checksum for previous tree - let grand_epoch_key = - BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch); - let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes()); - let _ = storage - .bubblegum_grand_epochs - .merge(grand_epoch_key.clone(), grand_epoch_val) - .await; - - if let Ok(Some(storage_consistency::BUBBLEGUM_GRAND_EPOCH_INVALIDATED)) = storage - .bubblegum_grand_epochs - .get_async(grand_epoch_key) - .await - { - // TODO: ??? + if !contains_invalidated_epoch { + // write checksum for previous tree + let grand_epoch_key = + BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch); + let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes()); + let _ = storage + .bubblegum_grand_epochs + .merge(grand_epoch_key.clone(), grand_epoch_val) + .await; + + if let Ok(Some(storage_consistency::BUBBLEGUM_GRAND_EPOCH_INVALIDATED)) = + storage + .bubblegum_grand_epochs + .get_async(grand_epoch_key) + .await + { + to_recalc.push(current_tree.unwrap()); + } + } else { + to_recalc.push(current_tree.unwrap()); } } current_tree = Some(epoch_key.tree_pubkey); + contains_invalidated_epoch = false; let new_grand_epoch_key = BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch); @@ -189,6 +510,8 @@ async fn calc_bubblegum_grand_epoch(storage: &Storage, target_grand_epoch: u16) .await; hasher = Hasher::default(); + } else if contains_invalidated_epoch { + continue; } hasher.hash(&k); hasher.hash(&v); @@ -202,16 +525,25 @@ async fn calc_bubblegum_grand_epoch(storage: &Storage, target_grand_epoch: u16) let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes()); let _ = storage .bubblegum_grand_epochs - .merge(grand_epoch_key, grand_epoch_val) + .merge(grand_epoch_key.clone(), grand_epoch_val) .await; + if let Ok(Some(storage_consistency::BUBBLEGUM_GRAND_EPOCH_INVALIDATED)) = storage + .bubblegum_grand_epochs + .get_async(grand_epoch_key) + .await + { + to_recalc.push(current_tree); + } } + + to_recalc } pub async fn calc_acc_nft_checksums(storage: &Storage, epoch: u32) { - match calc_acc_latest_state(&storage, epoch).await { + match calc_acc_latest_state(storage, epoch).await { Ok((invalidated_buckets, invalidated_grand_buckets)) => { - calc_acc_buckets(&storage, invalidated_buckets.iter()).await; - calc_acc_grand_buckets(&storage, invalidated_grand_buckets.iter()).await; + calc_acc_buckets(storage, invalidated_buckets.iter()).await; + calc_acc_grand_buckets(storage, invalidated_grand_buckets.iter()).await; } Err(e) => tracing::warn!("Error calculating accounts checksum: {e}"), }; @@ -300,7 +632,7 @@ async fn update_acc_if_needed( .unwrap_or(true); if need_to_update { - storage + let _ = storage .acc_nft_last .put_async(acc_key, AccountNft::new(change.slot, change.write_version)) .await; @@ -308,10 +640,13 @@ async fn update_acc_if_needed( let bucket = bucket_for_acc(change.account_pubkey); let grand_bucket = grand_bucket_for_bucket(bucket); if !invalidated_grand_buckets.contains(&grand_bucket) { - let _ = storage.acc_nft_grand_buckets.put_async( - AccountNftGrandBucketKey::new(grand_bucket), - ACC_GRAND_BUCKET_INVALIDATE, - ); + let _ = storage + .acc_nft_grand_buckets + .put_async( + AccountNftGrandBucketKey::new(grand_bucket), + ACC_GRAND_BUCKET_INVALIDATE, + ) + .await; invalidated_grand_buckets.insert(grand_bucket); } diff --git a/nft_ingester/src/fork_cleaner.rs b/nft_ingester/src/fork_cleaner.rs index e4cf965a..ce5bd96e 100644 --- a/nft_ingester/src/fork_cleaner.rs +++ b/nft_ingester/src/fork_cleaner.rs @@ -14,6 +14,8 @@ use tokio::time::sleep as tokio_sleep; use tokio::time::Instant; use tracing::info; +use crate::consistency_calculator::NftChangesTracker; + const CI_ITEMS_DELETE_BATCH_SIZE: usize = 100; const SLOT_CHECK_OFFSET: u64 = 1500; @@ -49,6 +51,7 @@ where cl_items_manager: Arc, fork_checker: Arc, data_consistency_storage: Arc, + nft_changes_tracker: Option>, metrics: Arc, } @@ -61,12 +64,14 @@ where cl_items_manager: Arc, fork_checker: Arc, data_consistency_storage: Arc, + nft_changes_tracker: Option>, metrics: Arc, ) -> Self { Self { cl_items_manager, fork_checker, data_consistency_storage, + nft_changes_tracker, metrics, } } @@ -165,6 +170,11 @@ where self.data_consistency_storage .drop_forked_bubblegum_changes(&changes_to_delete) .await; + if let Some(changes_tracker) = self.nft_changes_tracker.as_ref() { + changes_tracker + .watch_remove_forked_bubblegum_changes(&changes_to_delete) + .await; + } self.delete_tree_seq_idx(&mut delete_items).await; } } @@ -173,6 +183,11 @@ where self.data_consistency_storage .drop_forked_bubblegum_changes(&changes_to_delete) .await; + if let Some(changes_tracker) = self.nft_changes_tracker.as_ref() { + changes_tracker + .watch_remove_forked_bubblegum_changes(&changes_to_delete) + .await; + } self.delete_tree_seq_idx(&mut delete_items).await; } diff --git a/nft_ingester/src/lib.rs b/nft_ingester/src/lib.rs index c33251e6..774dfa1c 100644 --- a/nft_ingester/src/lib.rs +++ b/nft_ingester/src/lib.rs @@ -28,7 +28,6 @@ pub mod redis_receiver; pub mod rocks_db; pub mod scheduler; pub mod sequence_consistent; -pub mod solana_slot_service; pub mod tcp_receiver; pub mod token_updates_processor; pub mod transaction_ingester; diff --git a/nft_ingester/src/solana_slot_service.rs b/nft_ingester/src/solana_slot_service.rs deleted file mode 100644 index 7c6250ff..00000000 --- a/nft_ingester/src/solana_slot_service.rs +++ /dev/null @@ -1,50 +0,0 @@ -use std::{sync::Arc, time::Duration}; - -use rocks_db::storage_consistency::{ - current_estimated_epoch, epoch_of_slot, slots_to_next_epoch, update_estimated_epoch, -}; -use solana_client::nonblocking::rpc_client::RpcClient; - -use crate::consistency_calculator::ConsistencyCalcMsg; - -const SLOT_TIME: u64 = 400; - -/// Background job that monitors current solana slot number, -/// and notifies downstream components about epoch change. -/// This component is an __optional__ part of p2p consistency checking mechanis. -pub struct SolanaSlotService { - rpc_client: Arc, - epoch_changed_notifier: tokio::sync::mpsc::UnboundedSender, -} - -const MIN_LAG_TO_CHECK_SLOT: u64 = 60; - -impl SolanaSlotService { - async fn run_epoch_changed_notifier(&self) { - loop { - let estimated_epoch = current_estimated_epoch(); - - let Ok(solana_slot) = self.rpc_client.get_slot().await else { - tokio::time::sleep(Duration::from_secs(MIN_LAG_TO_CHECK_SLOT)).await; - continue; - }; - let current_epoch = epoch_of_slot(solana_slot); - - if current_epoch > estimated_epoch { - update_estimated_epoch(current_epoch); - let _ = self - .epoch_changed_notifier - .send(ConsistencyCalcMsg::EpochChanged { - epoch: current_epoch, - }); - } else { - let wait_more_slots = slots_to_next_epoch(solana_slot); - let mut seconds_to_wait = (wait_more_slots * SLOT_TIME) / 1000; - if seconds_to_wait < MIN_LAG_TO_CHECK_SLOT { - seconds_to_wait += MIN_LAG_TO_CHECK_SLOT; - } - tokio::time::sleep(Duration::from_secs(seconds_to_wait)).await; - } - } - } -} diff --git a/nft_ingester/tests/batch_mint_test.rs b/nft_ingester/tests/batch_mint_test.rs index 3d1781e6..0717c321 100644 --- a/nft_ingester/tests/batch_mint_test.rs +++ b/nft_ingester/tests/batch_mint_test.rs @@ -194,6 +194,7 @@ async fn save_batch_mint_to_queue_test() { env.rocks_env.storage.clone(), Arc::new(IngesterMetricsConfig::new()), tasks.clone(), + None, ); let metadata_url = "url".to_string(); diff --git a/nft_ingester/tests/bubblegum_tests.rs b/nft_ingester/tests/bubblegum_tests.rs index 1c7ceaf0..b2f0cc8a 100644 --- a/nft_ingester/tests/bubblegum_tests.rs +++ b/nft_ingester/tests/bubblegum_tests.rs @@ -89,6 +89,7 @@ mod tests { env.rocks_env.storage.clone(), Arc::new(IngesterMetricsConfig::new()), buffer.json_tasks.clone(), + None, )); let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( @@ -215,6 +216,7 @@ mod tests { env.rocks_env.storage.clone(), Arc::new(IngesterMetricsConfig::new()), buffer.json_tasks.clone(), + None, )); let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( diff --git a/nft_ingester/tests/clean_forks_test.rs b/nft_ingester/tests/clean_forks_test.rs index d3885f3e..dbaf6649 100644 --- a/nft_ingester/tests/clean_forks_test.rs +++ b/nft_ingester/tests/clean_forks_test.rs @@ -644,6 +644,7 @@ async fn test_clean_forks() { storage.clone(), storage.clone(), storage.clone(), + None, metrics_state.fork_cleaner_metrics.clone(), ); fork_cleaner.clean_forks(rx.resubscribe()).await; @@ -974,6 +975,7 @@ async fn test_process_forked_transaction() { storage.clone(), storage.clone(), storage.clone(), + None, metrics_state.fork_cleaner_metrics.clone(), ); fork_cleaner.clean_forks(shutdown_rx.resubscribe()).await; diff --git a/nft_ingester/tests/consistency_calculator_test.rs b/nft_ingester/tests/consistency_calculator_test.rs index 07bde969..b4f3c66e 100644 --- a/nft_ingester/tests/consistency_calculator_test.rs +++ b/nft_ingester/tests/consistency_calculator_test.rs @@ -1,12 +1,15 @@ #[cfg(test)] mod tests { - use nft_ingester::consistency_calculator::{calc_acc_nft_checksums, calc_bubblegum_checksums}; + use nft_ingester::consistency_calculator::ConsistencyCalcMsg; + use nft_ingester::consistency_calculator::{ + calc_acc_nft_checksums, calc_bubblegum_checksums, NftChangesTracker, + }; use rocks_db::{ column::TypedColumn, storage_consistency::{ - AccountNft, AccountNftBucket, AccountNftBucketKey, AccountNftChange, - AccountNftChangeKey, AccountNftKey, BubblegumChange, BubblegumChangeKey, - BubblegumEpoch, BubblegumEpochKey, BubblegumGrandEpochKey, Checksum, + AccountNft, AccountNftBucketKey, AccountNftChange, AccountNftChangeKey, AccountNftKey, + BubblegumChange, BubblegumChangeKey, BubblegumEpoch, BubblegumEpochKey, + BubblegumGrandEpochKey, Checksum, }, }; use setup::rocks::RocksTestEnvironment; @@ -63,7 +66,7 @@ mod tests { .unwrap(); // Calculate epoch and grand epoch checksum - calc_bubblegum_checksums(&storage, 1).await; + calc_bubblegum_checksums(&storage, 1, None).await; let expected_epoch_checksum = { let mut hasher = Hasher::default(); @@ -216,6 +219,27 @@ mod tests { ); } + #[tokio::test] + async fn test_notification_on_epoch_change() { + let storage = RocksTestEnvironment::new(&[]).storage; + let (sender, mut receiver) = tokio::sync::mpsc::channel(1000); + let sut = NftChangesTracker::new(storage, sender); + + let tree = Pubkey::new_unique(); + + sut.watch_bubblegum_change(tree, 90_001).await; + sut.watch_bubblegum_change(tree, 100_001).await; + + assert_eq!( + Ok(ConsistencyCalcMsg::EpochChanged { new_epoch: 10 }), + receiver.try_recv() + ); + assert_eq!( + Err(tokio::sync::mpsc::error::TryRecvError::Empty), + receiver.try_recv() + ); + } + fn make_pubkey_in_bucket(bucket: u16) -> Pubkey { let mut arr = Pubkey::new_unique().to_bytes(); let bucket_arr = bucket.to_be_bytes(); diff --git a/nft_ingester/tests/decompress.rs b/nft_ingester/tests/decompress.rs index 8519093a..18e3ac84 100644 --- a/nft_ingester/tests/decompress.rs +++ b/nft_ingester/tests/decompress.rs @@ -72,6 +72,7 @@ mod tests { env_rocks, Arc::new(IngesterMetricsConfig::new()), buffer.json_tasks.clone(), + None, )); let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new( diff --git a/rocks-db/src/storage_consistency.rs b/rocks-db/src/storage_consistency.rs index b9e0f086..f5e13029 100644 --- a/rocks-db/src/storage_consistency.rs +++ b/rocks-db/src/storage_consistency.rs @@ -20,20 +20,24 @@ use solana_sdk::pubkey::Pubkey; use crate::{column::TypedColumn, transaction::TreeUpdate, Storage}; -use std::{cell::Cell, collections::HashSet}; +use std::{collections::HashSet, sync::atomic::AtomicU64}; -/// Holds the number for an epoch that is happening at the moment -/// It should be super presize, i.e. it is OK (and even preferable) -/// to have a small lag between real epoch change (current solana slot) -/// and update of this variable. -const CURRENT_ESTIMATED_EPOCH: Cell = Cell::new(0); +static LAST_SLOT: AtomicU64 = AtomicU64::new(0); pub fn current_estimated_epoch() -> u32 { - CURRENT_ESTIMATED_EPOCH.get() + epoch_of_slot(LAST_SLOT.load(std::sync::atomic::Ordering::Relaxed)) } -pub fn update_estimated_epoch(new_value: u32) { - CURRENT_ESTIMATED_EPOCH.set(new_value); +pub fn last_tracked_slot() -> u64 { + LAST_SLOT.load(std::sync::atomic::Ordering::Relaxed) +} + +pub fn track_slot_counter(slot: u64) -> u64 { + let prev = LAST_SLOT.load(std::sync::atomic::Ordering::Relaxed); + if slot > prev { + LAST_SLOT.store(slot, std::sync::atomic::Ordering::Relaxed); + } + prev } pub fn epoch_of_slot(slot: u64) -> u32 { @@ -64,8 +68,8 @@ pub fn slots_to_next_epoch(slot: u64) -> u64 { /// which means we have 65536 buckets. /// This allows to have records in "account NFT changes" collumn family /// "grouped" by the bucket number. -pub fn bucket_for_acc(accout_pubkey: Pubkey) -> u16 { - let bytes = accout_pubkey.to_bytes(); +pub fn bucket_for_acc(account_pubkey: Pubkey) -> u16 { + let bytes = account_pubkey.to_bytes(); let mut b = <[u8; 2]>::default(); b.clone_from_slice(&bytes[0..2]); @@ -78,8 +82,8 @@ pub fn grand_bucket_for_bucket(bucket: u16) -> u16 { bucket >> 6 } -pub fn grand_bucket_for_acc(accout_pubkey: Pubkey) -> u16 { - grand_bucket_for_bucket(bucket_for_acc(accout_pubkey)) +pub fn grand_bucket_for_acc(account_pubkey: Pubkey) -> u16 { + grand_bucket_for_bucket(bucket_for_acc(account_pubkey)) } pub const BUBBLEGUM_EPOCH_INVALIDATED: BubblegumEpoch = BubblegumEpoch { @@ -110,8 +114,7 @@ pub const ACC_GRAND_BUCKET_INVALIDATE: AccountNftGrandBucket = AccountNftGrandBu /// Since the arrival of Solana data is asynchronous and has no strict order guarantees, /// we can easily fall into a situation when we are in the process of calculation /// of a checksum for an epoch, and a new update came befor the checksum has been written. -/// ```norun -/// +/// ```img /// epoch end a change for previous epoch arrived /// | | /// V V @@ -170,7 +173,7 @@ impl BubblegumChangeKey { pub fn tree_epoch_start_key(tree_pubkey: Pubkey, epoch: u32) -> BubblegumChangeKey { BubblegumChangeKey { epoch, - tree_pubkey: tree_pubkey, + tree_pubkey, slot: first_slot_in_epoch(epoch), seq: 0, } @@ -219,6 +222,12 @@ impl BubblegumEpochKey { epoch_num: first_epoch_in_grand_epoch(grand_epoch), } } + pub fn tree_grand_epoch_start_key(tree_pubkey: Pubkey, grand_epoch: u16) -> BubblegumEpochKey { + BubblegumEpochKey { + tree_pubkey, + epoch_num: first_epoch_in_grand_epoch(grand_epoch), + } + } } #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] @@ -472,8 +481,7 @@ impl Storage { batch: &mut rocksdb::WriteBatch, tree_update: &TreeUpdate, ) -> crate::Result<()> { - let key = - BubblegumChangeKey::new(tree_update.tree.clone(), tree_update.slot, tree_update.seq); + let key = BubblegumChangeKey::new(tree_update.tree, tree_update.slot, tree_update.seq); let value = BubblegumChange { signature: tree_update.tx.clone(), }; @@ -564,7 +572,9 @@ impl DataConsistencyStorage for Storage { // TODO: Replace with LazyLock after rustc update. lazy_static::lazy_static! { + pub static ref BUBBLEGUM_EPOCH_INVALIDATED_BYTES: Vec = bincode::serialize(&BUBBLEGUM_EPOCH_INVALIDATED).unwrap(); pub static ref BUBBLEGUM_EPOCH_CALCULATING_BYTES: Vec = bincode::serialize(&BUBBLEGUM_EPOCH_CALCULATING).unwrap(); + pub static ref BUBBLEGUM_GRAND_EPOCH_INVALIDATED_BYTES: Vec = bincode::serialize(&BUBBLEGUM_GRAND_EPOCH_INVALIDATED).unwrap(); pub static ref BUBBLEGUM_GRAND_EPOCH_CALCULATING_BYTES: Vec = bincode::serialize(&BUBBLEGUM_GRAND_EPOCH_CALCULATING).unwrap(); }