From 8595abbd2253a632a3d635a510a311431c1f7bb4 Mon Sep 17 00:00:00 2001 From: snorochevskiy Date: Fri, 1 Nov 2024 17:51:15 +0200 Subject: [PATCH] Saving acc NFT change in the same batch as account data --- nft_ingester/src/accounts_processor.rs | 2 +- nft_ingester/src/bin/ingester/main.rs | 2 +- nft_ingester/src/bin/raw_backfiller/main.rs | 5 +- nft_ingester/src/consistency_calculator.rs | 63 ++++++++++--------- .../tests/consistency_calculator_test.rs | 2 +- rocks-db/src/batch_savers.rs | 27 ++++++++ rocks-db/src/storage_consistency.rs | 12 ++++ 7 files changed, 77 insertions(+), 36 deletions(-) diff --git a/nft_ingester/src/accounts_processor.rs b/nft_ingester/src/accounts_processor.rs index 94330619..13d772a0 100644 --- a/nft_ingester/src/accounts_processor.rs +++ b/nft_ingester/src/accounts_processor.rs @@ -235,7 +235,7 @@ impl AccountsProcessor { let (account_pubkey, slot, write_version) = unprocessed_account.solana_change_info(); nft_changes_tracker - .track_account_change(account_pubkey, slot, write_version) + .track_account_change(batch_storage, account_pubkey, slot, write_version) .await; } self.metrics diff --git a/nft_ingester/src/bin/ingester/main.rs b/nft_ingester/src/bin/ingester/main.rs index b3c1612e..8935c643 100644 --- a/nft_ingester/src/bin/ingester/main.rs +++ b/nft_ingester/src/bin/ingester/main.rs @@ -189,7 +189,7 @@ pub async fn main() -> Result<(), IngesterError> { let rpc_client = Arc::new(RpcClient::new(config.rpc_host.clone())); let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE); - let changes_tracker = Arc::new(NftChangesTracker::new(primary_rocks_storage.clone(), nft_change_snd.clone())); + let changes_tracker = Arc::new(NftChangesTracker::new(nft_change_snd.clone())); consistency_calculator::run_bg_consistency_calculator( nft_change_rcv, primary_rocks_storage.clone(), diff --git a/nft_ingester/src/bin/raw_backfiller/main.rs b/nft_ingester/src/bin/raw_backfiller/main.rs index 297f1c6f..ea1a20b5 100644 --- a/nft_ingester/src/bin/raw_backfiller/main.rs +++ b/nft_ingester/src/bin/raw_backfiller/main.rs @@ -144,10 +144,7 @@ pub async fn main() -> Result<(), IngesterError> { let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1); let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE); - let changes_tracker = Arc::new(NftChangesTracker::new( - rocks_storage.clone(), - nft_change_snd.clone(), - )); + let changes_tracker = Arc::new(NftChangesTracker::new(nft_change_snd.clone())); consistency_calculator::run_bg_consistency_calculator( nft_change_rcv, rocks_storage.clone(), diff --git a/nft_ingester/src/consistency_calculator.rs b/nft_ingester/src/consistency_calculator.rs index 82ff19d9..c0673aff 100644 --- a/nft_ingester/src/consistency_calculator.rs +++ b/nft_ingester/src/consistency_calculator.rs @@ -2,6 +2,7 @@ //! and account-based NFT checsums that are used in peer-to-peer //! Aura nodes communication to identify missing data on a node. +use rocks_db::batch_savers::BatchSaveStorage; use rocks_db::{ column::TypedColumn, storage_consistency::{ @@ -48,13 +49,12 @@ pub enum ConsistencyCalcMsg { /// and notifying checksum calculator when a whole epoch, /// or an individual bubblegum three/account checksum should be calculated. pub struct NftChangesTracker { - storage: Arc, sender: Sender, } impl NftChangesTracker { - pub fn new(storage: Arc, sender: Sender) -> NftChangesTracker { - NftChangesTracker { storage, sender } + pub fn new(sender: Sender) -> NftChangesTracker { + NftChangesTracker { sender } } /// Persists given account NFT change into the sotrage, and, if the change is from the epoch @@ -62,11 +62,13 @@ impl NftChangesTracker { /// about late data. /// /// ## Args: + /// * `batch_storage` - same batch storage that is used to save account data /// * `account_pubkey` - Pubkey of the NFT account /// * `slot` - the slot number that change is made in /// * `write_version` - write version of the change pub async fn track_account_change( &self, + batch_storage: &mut BatchSaveStorage, account_pubkey: Pubkey, slot: u64, write_version: u64, @@ -86,22 +88,15 @@ impl NftChangesTracker { if epoch < last_slot_epoch { let bucket = bucket_for_acc(account_pubkey); let grand_bucket = grand_bucket_for_bucket(bucket); - let _ = self - .storage - .acc_nft_grand_buckets - .put_async( - AccountNftGrandBucketKey::new(grand_bucket), - ACC_GRAND_BUCKET_INVALIDATE, - ) - .await; - let _ = self - .storage - .acc_nft_buckets - .put_async(AccountNftBucketKey::new(bucket), ACC_BUCKET_INVALIDATE) - .await; + let _ = batch_storage.put_acc_grand_bucket( + AccountNftGrandBucketKey::new(grand_bucket), + ACC_GRAND_BUCKET_INVALIDATE, + ); + let _ = batch_storage + .put_acc_bucket(AccountNftBucketKey::new(bucket), ACC_BUCKET_INVALIDATE); } - let _ = self.storage.acc_nft_changes.put_async(key, value).await; + let _ = batch_storage.put_account_change(key, value); if epoch < last_slot_epoch { let _ = self @@ -278,14 +273,17 @@ async fn process_bbgm_tasks(storage: Arc, tasks: Arc (), + Some(t) if *t != BbgmTask::Suspend => { + guard.pop_first(); + continue; + } + _ => { + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } } } let maybe_task = { @@ -313,10 +311,17 @@ async fn process_acc_tasks(storage: Arc, tasks: Arc (), + Some(t) if *t != AccTask::Suspend => { + guard.pop_first(); + continue; + } + _ => { + tokio::time::sleep(Duration::from_secs(10)).await; + continue; + } } } let maybe_task = { diff --git a/nft_ingester/tests/consistency_calculator_test.rs b/nft_ingester/tests/consistency_calculator_test.rs index b4f3c66e..b93035ab 100644 --- a/nft_ingester/tests/consistency_calculator_test.rs +++ b/nft_ingester/tests/consistency_calculator_test.rs @@ -223,7 +223,7 @@ mod tests { async fn test_notification_on_epoch_change() { let storage = RocksTestEnvironment::new(&[]).storage; let (sender, mut receiver) = tokio::sync::mpsc::channel(1000); - let sut = NftChangesTracker::new(storage, sender); + let sut = NftChangesTracker::new(sender); let tree = Pubkey::new_unique(); diff --git a/rocks-db/src/batch_savers.rs b/rocks-db/src/batch_savers.rs index b4bf3178..1a07d0c3 100644 --- a/rocks-db/src/batch_savers.rs +++ b/rocks-db/src/batch_savers.rs @@ -1,4 +1,8 @@ use crate::asset::{AssetCollection, MetadataMintMap}; +use crate::storage_consistency::{ + AccountNftBucket, AccountNftBucketKey, AccountNftChange, AccountNftChangeKey, + AccountNftGrandBucket, AccountNftGrandBucketKey, +}; use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx}; use crate::Result; use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage}; @@ -168,6 +172,29 @@ impl BatchSaveStorage { )?; Ok(()) } + pub fn put_account_change( + &mut self, + key: AccountNftChangeKey, + v: AccountNftChange, + ) -> Result<()> { + self.storage + .acc_nft_changes + .put_with_batch(&mut self.batch, key, &v) + } + pub fn put_acc_bucket(&mut self, key: AccountNftBucketKey, v: AccountNftBucket) -> Result<()> { + self.storage + .acc_nft_buckets + .put_with_batch(&mut self.batch, key, &v) + } + pub fn put_acc_grand_bucket( + &mut self, + key: AccountNftGrandBucketKey, + v: AccountNftGrandBucket, + ) -> Result<()> { + self.storage + .acc_nft_grand_buckets + .put_with_batch(&mut self.batch, key, &v) + } pub fn asset_updated_with_batch(&mut self, slot: u64, pubkey: Pubkey) -> Result<()> { self.storage .asset_updated_with_batch(&mut self.batch, slot, pubkey)?; diff --git a/rocks-db/src/storage_consistency.rs b/rocks-db/src/storage_consistency.rs index f5e13029..597d995b 100644 --- a/rocks-db/src/storage_consistency.rs +++ b/rocks-db/src/storage_consistency.rs @@ -309,6 +309,18 @@ pub struct AccountNftChangeKey { pub write_version: u64, } +impl AccountNftChangeKey { + pub fn new(account_pubkey: Pubkey, slot: u64, write_version: u64) -> AccountNftChangeKey { + let epoch = epoch_of_slot(slot); + AccountNftChangeKey { + epoch, + account_pubkey, + slot, + write_version, + } + } +} + #[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)] pub struct AccountNftChange {}