Skip to content

Commit

Permalink
Saving acc NFT change in the same batch as account data
Browse files Browse the repository at this point in the history
  • Loading branch information
snorochevskiy committed Nov 1, 2024
1 parent 23cb90a commit 8595abb
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 36 deletions.
2 changes: 1 addition & 1 deletion nft_ingester/src/accounts_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> {
let (account_pubkey, slot, write_version) =
unprocessed_account.solana_change_info();
nft_changes_tracker
.track_account_change(account_pubkey, slot, write_version)
.track_account_change(batch_storage, account_pubkey, slot, write_version)
.await;
}
self.metrics
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ pub async fn main() -> Result<(), IngesterError> {

let rpc_client = Arc::new(RpcClient::new(config.rpc_host.clone()));
let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE);
let changes_tracker = Arc::new(NftChangesTracker::new(primary_rocks_storage.clone(), nft_change_snd.clone()));
let changes_tracker = Arc::new(NftChangesTracker::new(nft_change_snd.clone()));
consistency_calculator::run_bg_consistency_calculator(
nft_change_rcv,
primary_rocks_storage.clone(),
Expand Down
5 changes: 1 addition & 4 deletions nft_ingester/src/bin/raw_backfiller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,7 @@ pub async fn main() -> Result<(), IngesterError> {
let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);

let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE);
let changes_tracker = Arc::new(NftChangesTracker::new(
rocks_storage.clone(),
nft_change_snd.clone(),
));
let changes_tracker = Arc::new(NftChangesTracker::new(nft_change_snd.clone()));
consistency_calculator::run_bg_consistency_calculator(
nft_change_rcv,
rocks_storage.clone(),
Expand Down
63 changes: 34 additions & 29 deletions nft_ingester/src/consistency_calculator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
//! and account-based NFT checsums that are used in peer-to-peer
//! Aura nodes communication to identify missing data on a node.

use rocks_db::batch_savers::BatchSaveStorage;
use rocks_db::{
column::TypedColumn,
storage_consistency::{
Expand Down Expand Up @@ -48,25 +49,26 @@ pub enum ConsistencyCalcMsg {
/// and notifying checksum calculator when a whole epoch,
/// or an individual bubblegum three/account checksum should be calculated.
pub struct NftChangesTracker {
storage: Arc<Storage>,
sender: Sender<ConsistencyCalcMsg>,
}

impl NftChangesTracker {
pub fn new(storage: Arc<Storage>, sender: Sender<ConsistencyCalcMsg>) -> NftChangesTracker {
NftChangesTracker { storage, sender }
pub fn new(sender: Sender<ConsistencyCalcMsg>) -> NftChangesTracker {
NftChangesTracker { sender }
}

/// Persists given account NFT change into the sotrage, and, if the change is from the epoch
/// that is previous to the current epoch, then also notifies checksums calculator
/// about late data.
///
/// ## Args:
/// * `batch_storage` - same batch storage that is used to save account data
/// * `account_pubkey` - Pubkey of the NFT account
/// * `slot` - the slot number that change is made in
/// * `write_version` - write version of the change
pub async fn track_account_change(
&self,
batch_storage: &mut BatchSaveStorage,
account_pubkey: Pubkey,
slot: u64,
write_version: u64,
Expand All @@ -86,22 +88,15 @@ impl NftChangesTracker {
if epoch < last_slot_epoch {
let bucket = bucket_for_acc(account_pubkey);
let grand_bucket = grand_bucket_for_bucket(bucket);
let _ = self
.storage
.acc_nft_grand_buckets
.put_async(
AccountNftGrandBucketKey::new(grand_bucket),
ACC_GRAND_BUCKET_INVALIDATE,
)
.await;
let _ = self
.storage
.acc_nft_buckets
.put_async(AccountNftBucketKey::new(bucket), ACC_BUCKET_INVALIDATE)
.await;
let _ = batch_storage.put_acc_grand_bucket(
AccountNftGrandBucketKey::new(grand_bucket),
ACC_GRAND_BUCKET_INVALIDATE,
);
let _ = batch_storage
.put_acc_bucket(AccountNftBucketKey::new(bucket), ACC_BUCKET_INVALIDATE);
}

let _ = self.storage.acc_nft_changes.put_async(key, value).await;
let _ = batch_storage.put_account_change(key, value);

if epoch < last_slot_epoch {
let _ = self
Expand Down Expand Up @@ -278,14 +273,17 @@ async fn process_bbgm_tasks(storage: Arc<Storage>, tasks: Arc<Mutex<BTreeSet<Bbg
let mut is_suspended = false;
loop {
if is_suspended {
let guard = tasks.lock().await;
if guard
.first()
.map(|t| *t != BbgmTask::Resume)
.unwrap_or(true)
{
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
let mut guard = tasks.lock().await;
match guard.first() {
Some(t) if *t != BbgmTask::Resume => (),
Some(t) if *t != BbgmTask::Suspend => {
guard.pop_first();
continue;
}
_ => {
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
}
}
let maybe_task = {
Expand Down Expand Up @@ -313,10 +311,17 @@ async fn process_acc_tasks(storage: Arc<Storage>, tasks: Arc<Mutex<BTreeSet<AccT
let mut is_suspended = false;
loop {
if is_suspended {
let guard = tasks.lock().await;
if guard.first().map(|t| *t != AccTask::Resume).unwrap_or(true) {
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
let mut guard = tasks.lock().await;
match guard.first() {
Some(t) if *t != AccTask::Resume => (),
Some(t) if *t != AccTask::Suspend => {
guard.pop_first();
continue;
}
_ => {
tokio::time::sleep(Duration::from_secs(10)).await;
continue;
}
}
}
let maybe_task = {
Expand Down
2 changes: 1 addition & 1 deletion nft_ingester/tests/consistency_calculator_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ mod tests {
async fn test_notification_on_epoch_change() {
let storage = RocksTestEnvironment::new(&[]).storage;
let (sender, mut receiver) = tokio::sync::mpsc::channel(1000);
let sut = NftChangesTracker::new(storage, sender);
let sut = NftChangesTracker::new(sender);

let tree = Pubkey::new_unique();

Expand Down
27 changes: 27 additions & 0 deletions rocks-db/src/batch_savers.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
use crate::asset::{AssetCollection, MetadataMintMap};
use crate::storage_consistency::{
AccountNftBucket, AccountNftBucketKey, AccountNftChange, AccountNftChangeKey,
AccountNftGrandBucket, AccountNftGrandBucketKey,
};
use crate::token_accounts::{TokenAccountMintOwnerIdx, TokenAccountOwnerIdx};
use crate::Result;
use crate::{AssetAuthority, AssetDynamicDetails, AssetOwner, AssetStaticDetails, Storage};
Expand Down Expand Up @@ -168,6 +172,29 @@ impl BatchSaveStorage {
)?;
Ok(())
}
pub fn put_account_change(
&mut self,
key: AccountNftChangeKey,
v: AccountNftChange,
) -> Result<()> {
self.storage
.acc_nft_changes
.put_with_batch(&mut self.batch, key, &v)
}
pub fn put_acc_bucket(&mut self, key: AccountNftBucketKey, v: AccountNftBucket) -> Result<()> {
self.storage
.acc_nft_buckets
.put_with_batch(&mut self.batch, key, &v)
}
pub fn put_acc_grand_bucket(
&mut self,
key: AccountNftGrandBucketKey,
v: AccountNftGrandBucket,
) -> Result<()> {
self.storage
.acc_nft_grand_buckets
.put_with_batch(&mut self.batch, key, &v)
}
pub fn asset_updated_with_batch(&mut self, slot: u64, pubkey: Pubkey) -> Result<()> {
self.storage
.asset_updated_with_batch(&mut self.batch, slot, pubkey)?;
Expand Down
12 changes: 12 additions & 0 deletions rocks-db/src/storage_consistency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,18 @@ pub struct AccountNftChangeKey {
pub write_version: u64,
}

impl AccountNftChangeKey {
pub fn new(account_pubkey: Pubkey, slot: u64, write_version: u64) -> AccountNftChangeKey {
let epoch = epoch_of_slot(slot);
AccountNftChangeKey {
epoch,
account_pubkey,
slot,
write_version,
}
}
}

#[derive(Serialize, Deserialize, Debug, Clone, Hash, PartialEq, Eq)]
pub struct AccountNftChange {}

Expand Down

0 comments on commit 8595abb

Please sign in to comment.