Skip to content

Commit

Permalink
Adding tracking of nft changes
Browse files Browse the repository at this point in the history
  • Loading branch information
snorochevskiy committed Nov 1, 2024
1 parent 862cf76 commit 23cb90a
Show file tree
Hide file tree
Showing 16 changed files with 581 additions and 161 deletions.
18 changes: 18 additions & 0 deletions entities/src/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,24 @@ pub struct UnprocessedAccountMessage {
pub id: String,
}

impl UnprocessedAccountMessage {
pub fn solana_change_info(&self) -> (Pubkey, u64, u64) {
let (slot, write_version) = match &self.account {
UnprocessedAccount::MetadataInfo(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::Token(v) => (v.slot_updated as u64, v.write_version),
UnprocessedAccount::Mint(v) => (v.slot_updated as u64, v.write_version),
UnprocessedAccount::Edition(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::BurnMetadata(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::BurnMplCore(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::MplCore(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::Inscription(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::InscriptionData(v) => (v.slot_updated, v.write_version),
UnprocessedAccount::MplCoreFee(v) => (v.slot_updated, v.write_version),
};
(self.key, slot, write_version)
}
}

pub struct BufferedTxWithID {
pub tx: BufferedTransaction,
pub id: String,
Expand Down
1 change: 1 addition & 0 deletions nft_ingester/benches/ingester_benchmark.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ async fn bench_ingest(
rocks_dest.clone(),
Arc::new(IngesterMetricsConfig::new()),
buffer.json_tasks.clone(),
None,
));

let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new(
Expand Down
16 changes: 13 additions & 3 deletions nft_ingester/src/accounts_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::consistency_calculator::NftChangesTracker;
use crate::error::IngesterError;
use crate::inscriptions_processor::InscriptionsProcessor;
use crate::mpl_core_fee_indexing_processor::MplCoreFeeProcessor;
Expand Down Expand Up @@ -38,6 +39,7 @@ pub async fn run_accounts_processor<AG: UnprocessedAccountsGetter + Sync + Send
metrics: Arc<IngesterMetricsConfig>,
postgre_client: Arc<PgClient>,
rpc_client: Arc<RpcClient>,
nft_changes_tracker: Arc<NftChangesTracker>,
join_set: Arc<Mutex<JoinSet<Result<(), JoinError>>>>,
) {
mutexed_tasks.lock().await.spawn(async move {
Expand All @@ -54,7 +56,7 @@ pub async fn run_accounts_processor<AG: UnprocessedAccountsGetter + Sync + Send
.expect("Failed to build 'AccountsProcessor'!");

account_processor
.process_accounts(rx, rocks_storage, account_buffer_size)
.process_accounts(rx, rocks_storage, account_buffer_size, nft_changes_tracker)
.await;

Ok(())
Expand Down Expand Up @@ -114,6 +116,7 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> {
rx: Receiver<()>,
storage: Arc<Storage>,
accounts_batch_size: usize,
nft_changes_tracker: Arc<NftChangesTracker>,
) {
let mut batch_storage =
BatchSaveStorage::new(storage, accounts_batch_size, self.metrics.clone());
Expand All @@ -133,7 +136,7 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> {
continue;
}
};
self.process_account(&mut batch_storage, unprocessed_accounts, &mut core_fees, &mut ack_ids, &mut interval, &mut batch_fill_instant).await;
self.process_account(&mut batch_storage, unprocessed_accounts, &mut core_fees, &mut ack_ids, &mut interval, &mut batch_fill_instant, &nft_changes_tracker).await;
},
_ = interval.tick() => {
self.flush(&mut batch_storage, &mut ack_ids, &mut interval, &mut batch_fill_instant);
Expand All @@ -160,6 +163,7 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> {
ack_ids: &mut Vec<String>,
interval: &mut tokio::time::Interval,
batch_fill_instant: &mut Instant,
nft_changes_tracker: &NftChangesTracker,
) {
for unprocessed_account in unprocessed_accounts {
let processing_result = match &unprocessed_account.account {
Expand Down Expand Up @@ -227,7 +231,13 @@ impl<T: UnprocessedAccountsGetter> AccountsProcessor<T> {
error!("Processing account {}: {}", unprocessed_account.key, err);
continue;
}
// TODO: write account change
{
let (account_pubkey, slot, write_version) =
unprocessed_account.solana_change_info();
nft_changes_tracker
.track_account_change(account_pubkey, slot, write_version)
.await;
}
self.metrics
.inc_accounts(unprocessed_account.account.into());
ack_ids.push(unprocessed_account.id);
Expand Down
19 changes: 18 additions & 1 deletion nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use plerkle_messenger::ConsumptionType;
use pprof::ProfilerGuardBuilder;
use rocks_db::bubblegum_slots::{BubblegumSlotGetter, IngestableSlotGetter};
use solana_client::nonblocking::rpc_client::RpcClient;
use tokio::sync::{broadcast, Mutex};
use tokio::sync::{broadcast, mpsc, Mutex};
use tokio::task::JoinSet;
use tokio::time::sleep as tokio_sleep;
use tracing::{error, info, warn};
Expand All @@ -42,6 +42,9 @@ use nft_ingester::buffer::{debug_buffer, Buffer};
use nft_ingester::config::{
setup_config, ApiConfig, BackfillerConfig, BackfillerMode, IngesterConfig, MessageSource, INGESTER_CONFIG_PREFIX,
};
use nft_ingester::consistency_calculator;
use nft_ingester::consistency_calculator::NftChangesTracker;
use nft_ingester::consistency_calculator::NTF_CHANGES_NOTIFICATION_QUEUE_SIZE;
use nft_ingester::fork_cleaner::{run_fork_cleaner, ForkCleaner};
use nft_ingester::gapfiller::{process_asset_details_stream_wrapper, run_sequence_consistent_gapfiller};
use nft_ingester::index_syncronizer::Synchronizer;
Expand Down Expand Up @@ -185,6 +188,14 @@ pub async fn main() -> Result<(), IngesterError> {
}

let rpc_client = Arc::new(RpcClient::new(config.rpc_host.clone()));
let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE);
let changes_tracker = Arc::new(NftChangesTracker::new(primary_rocks_storage.clone(), nft_change_snd.clone()));
consistency_calculator::run_bg_consistency_calculator(
nft_change_rcv,
primary_rocks_storage.clone(),
shutdown_rx.resubscribe(),
);

for _ in 0..config.accounts_parsing_workers {
match config.message_source {
MessageSource::Redis => {
Expand All @@ -206,6 +217,7 @@ pub async fn main() -> Result<(), IngesterError> {
metrics_state.ingester_metrics.clone(),
index_pg_storage.clone(),
rpc_client.clone(),
changes_tracker.clone(),
mutexed_tasks.clone(),
)
.await;
Expand All @@ -221,6 +233,7 @@ pub async fn main() -> Result<(), IngesterError> {
metrics_state.ingester_metrics.clone(),
index_pg_storage.clone(),
rpc_client.clone(),
changes_tracker.clone(),
mutexed_tasks.clone(),
)
.await;
Expand All @@ -240,6 +253,7 @@ pub async fn main() -> Result<(), IngesterError> {
metrics_state.ingester_metrics.clone(),
index_pg_storage.clone(),
rpc_client.clone(),
changes_tracker.clone(),
mutexed_tasks.clone(),
)
.await;
Expand Down Expand Up @@ -363,6 +377,7 @@ pub async fn main() -> Result<(), IngesterError> {
primary_rocks_storage.clone(),
metrics_state.ingester_metrics.clone(),
buffer.json_tasks.clone(),
Some(changes_tracker.clone()),
));

for _ in 0..config.transactions_parsing_workers {
Expand Down Expand Up @@ -407,6 +422,7 @@ pub async fn main() -> Result<(), IngesterError> {
primary_rocks_storage.clone(),
metrics_state.ingester_metrics.clone(),
buffer.json_tasks.clone(),
Some(changes_tracker.clone()),
));
let tx_ingester = Arc::new(BackfillTransactionIngester::new(backfill_bubblegum_updates_processor.clone()));
let backfiller_config = setup_config::<BackfillerConfig>(INGESTER_CONFIG_PREFIX);
Expand Down Expand Up @@ -689,6 +705,7 @@ pub async fn main() -> Result<(), IngesterError> {
primary_rocks_storage.clone(),
primary_rocks_storage.clone(),
primary_rocks_storage.clone(),
Some(changes_tracker.clone()),
metrics_state.fork_cleaner_metrics.clone(),
);
let rx = shutdown_rx.resubscribe();
Expand Down
16 changes: 16 additions & 0 deletions nft_ingester/src/bin/raw_backfiller/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ use nft_ingester::buffer::Buffer;
use nft_ingester::config::{
self, init_logger, setup_config, BackfillerConfig, RawBackfillConfig, INGESTER_CONFIG_PREFIX,
};
use nft_ingester::consistency_calculator;
use nft_ingester::consistency_calculator::NftChangesTracker;
use nft_ingester::consistency_calculator::NTF_CHANGES_NOTIFICATION_QUEUE_SIZE;
use nft_ingester::error::IngesterError;
use nft_ingester::init::graceful_stop;
use nft_ingester::transaction_ingester;
Expand All @@ -19,6 +22,7 @@ use metrics_utils::{BackfillerMetricsConfig, IngesterMetricsConfig};
use rocks_db::bubblegum_slots::BubblegumSlotGetter;
use rocks_db::migrator::MigrationState;
use rocks_db::Storage;
use tokio::sync::mpsc;
use tokio::sync::{broadcast, Mutex};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -139,6 +143,17 @@ pub async fn main() -> Result<(), IngesterError> {
);
let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);

let (nft_change_snd, nft_change_rcv) = mpsc::channel(NTF_CHANGES_NOTIFICATION_QUEUE_SIZE);
let changes_tracker = Arc::new(NftChangesTracker::new(
rocks_storage.clone(),
nft_change_snd.clone(),
));
consistency_calculator::run_bg_consistency_calculator(
nft_change_rcv,
rocks_storage.clone(),
shutdown_rx.resubscribe(),
);

match backfiller_config.backfiller_mode {
config::BackfillerMode::IngestDirectly => {
todo!();
Expand Down Expand Up @@ -174,6 +189,7 @@ pub async fn main() -> Result<(), IngesterError> {
rocks_storage.clone(),
ingester_metrics.clone(),
buffer.json_tasks.clone(),
Some(changes_tracker.clone()),
));

let tx_ingester = Arc::new(transaction_ingester::BackfillTransactionIngester::new(
Expand Down
21 changes: 20 additions & 1 deletion nft_ingester/src/bubblegum_updates_processor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::consistency_calculator::NftChangesTracker;
use crate::error::IngesterError;
use crate::flatbuffer_mapper::FlatbufferMapper;
use crate::plerkle;
Expand Down Expand Up @@ -60,6 +61,7 @@ pub struct BubblegumTxProcessor {
pub transaction_parser: Arc<FlatbufferMapper>,
pub instruction_parser: Arc<BubblegumParser>,
pub rocks_client: Arc<rocks_db::Storage>,
pub nft_change_tracker: Option<Arc<NftChangesTracker>>,

pub json_tasks: Arc<Mutex<VecDeque<Task>>>,
pub metrics: Arc<IngesterMetricsConfig>,
Expand All @@ -70,11 +72,13 @@ impl BubblegumTxProcessor {
rocks_client: Arc<rocks_db::Storage>,
metrics: Arc<IngesterMetricsConfig>,
json_tasks: Arc<Mutex<VecDeque<Task>>>,
nft_change_tracker: Option<Arc<NftChangesTracker>>,
) -> Self {
BubblegumTxProcessor {
transaction_parser: Arc::new(FlatbufferMapper {}),
instruction_parser: Arc::new(BubblegumParser {}),
rocks_client,
nft_change_tracker,
json_tasks,
metrics,
}
Expand Down Expand Up @@ -1173,8 +1177,23 @@ impl BubblegumTxProcessor {
Ok(())
}

/// Checks if the given instruction is a late instruction, belongs to a previous epoch,
/// and emits notification a notification, that will force the checksum calculator component
/// to recalculate epoch checksum.
///
/// Note: this function only sends the notification, the saving of bubblgum changes
/// happens in [Storage::store_instruction_result_with_batch] by calling
/// [Storage::track_tree_change_with_batch]
async fn calc_checksums_if_needed(&self, instructions: &[InstructionResult]) {
// TODO: if there was a change with a slot from the previous epoch, **emit** checksum recalc
if let Some(nft_change_tracker) = self.nft_change_tracker.as_ref() {
for ix in instructions {
if let Some(tree_update) = ix.tree_update.as_ref() {
nft_change_tracker
.watch_bubblegum_change(tree_update.tree, tree_update.slot)
.await;
}
}
}
}
}

Expand Down
Loading

0 comments on commit 23cb90a

Please sign in to comment.