Skip to content

Commit

Permalink
MTG-750 Adding bubblegum checksums storage
Browse files Browse the repository at this point in the history
  • Loading branch information
snorochevskiy committed Oct 22, 2024
1 parent 92ae9cd commit 0c84d5c
Show file tree
Hide file tree
Showing 13 changed files with 921 additions and 1 deletion.
1 change: 1 addition & 0 deletions nft_ingester/src/bin/ingester/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -686,6 +686,7 @@ pub async fn main() -> Result<(), IngesterError> {
}

let fork_cleaner = ForkCleaner::new(
primary_rocks_storage.clone(),
primary_rocks_storage.clone(),
primary_rocks_storage.clone(),
metrics_state.fork_cleaner_metrics.clone(),
Expand Down
7 changes: 7 additions & 0 deletions nft_ingester/src/bubblegum_updates_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,9 @@ impl BubblegumTxProcessor {
.await
.map_err(|e| IngesterError::DatabaseError(e.to_string()));

self.calc_checksums_if_needed(&result.instruction_results)
.await;

result_to_metrics(self.metrics.clone(), &res, "process_transaction");
self.metrics.set_latency(
"process_transaction",
Expand Down Expand Up @@ -1169,6 +1172,10 @@ impl BubblegumTxProcessor {

Ok(())
}

async fn calc_checksums_if_needed(&self, instructions: &[InstructionResult]) {
// TODO: if there was a change with a slot from the previous epoch, **emit** checksum recalc
}
}

fn use_method_from_mpl_bubblegum_state(
Expand Down
199 changes: 199 additions & 0 deletions nft_ingester/src/consistency_calculator.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
use rocks_db::{
column::TypedColumn,
storage_consistency::{
self, grand_epoch_of_epoch, BubblegumChange, BubblegumChangeKey, BubblegumEpoch,
BubblegumEpochKey, BubblegumGrandEpoch, BubblegumGrandEpochKey,
},
Storage,
};
use solana_sdk::{hash::Hasher, pubkey::Pubkey};
use std::{sync::Arc, time::Duration};
use storage_consistency::{BUBBLEGUM_EPOCH_CALCULATING, BUBBLEGUM_GRAND_EPOCH_CALCULATING};
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinHandle};

pub enum ConsistencyCalcMsg {
EpochChanged { epoch: u32 },
BubblegumUpdated { tree: Pubkey, slot: u64 },
AccUpdated { account: Pubkey, slot: u64 },
}

/// An entry point for checksums calculation component.
/// Should be called from "main".
pub async fn run_bg_consistency_calculator(
mut rcv: UnboundedReceiver<ConsistencyCalcMsg>,
storage: Arc<Storage>,
) {
tokio::spawn(async move {
let mut _bubblegum_task = None;

// Don't look here too much for now, it is to be implemented
loop {
match rcv.recv().await {
Some(msg) => match msg {
ConsistencyCalcMsg::EpochChanged { epoch } => {
// TODO: Scheduler epoch calc. Should calc immediately or wait, since more data can come?
let t = schedule_bublegum_calc(
Duration::from_secs(300),
storage.clone(),
epoch,
);
_bubblegum_task = Some(t);
}
ConsistencyCalcMsg::BubblegumUpdated { tree: _, slot: _ } => todo!(),
ConsistencyCalcMsg::AccUpdated {
account: _,
slot: _,
} => todo!(),
},
None => break,
}
}
});
}

/// After a given lag, launches checksum calculation for the given epoch.
///
/// ## Args:
/// * `start_lag` - time to wait before actual calculation.
/// Required because we might want to wait for late updates.
/// * `storage` - database
/// * `epoch` - the epoch the calculation should be done for
fn schedule_bublegum_calc(
start_lag: Duration,
storage: Arc<Storage>,
epoch: u32,
) -> JoinHandle<()> {
tokio::spawn(async move {
tokio::time::sleep(start_lag).await;
calc_bubblegum_checksums(&storage, epoch).await;
})
}

pub async fn calc_bubblegum_checksums(storage: &Storage, epoch: u32) {
calc_bubblegum_epoch(&storage, epoch).await;
calc_bubblegum_grand_epoch(&storage, grand_epoch_of_epoch(epoch)).await;
}

/// Calculates and stores bubblegum epoch checksums for bubblegum updates
/// received during the given epoch.
///
/// ## Args:
/// * `storage` - database
/// * `target_epoch` - the number of an epoch the checksum should be calculated for
async fn calc_bubblegum_epoch(storage: &Storage, target_epoch: u32) {
// iterate over changes and calculate checksum per tree.

let mut current_tree: Option<Pubkey> = None;

let mut it = storage
.bubblegum_changes
.iter(BubblegumChangeKey::epoch_start_key(target_epoch));
let mut hasher = Hasher::default();

while let Some(Ok((k, v))) = it.next() {
let Ok(change_key) = BubblegumChange::decode_key(k.to_vec()) else {
continue;
};
if change_key.epoch > target_epoch {
break;
}
if current_tree != Some(change_key.tree_pubkey) {
if current_tree.is_some() {
// write checksum for previous tree
let epoch_key = BubblegumEpochKey::new(current_tree.unwrap(), target_epoch);

let epoch_val = BubblegumEpoch::from(hasher.result().to_bytes());
let _ = storage
.bubblegum_epochs
.merge(epoch_key.clone(), epoch_val)
.await;

if let Ok(Some(storage_consistency::BUBBLEGUM_EPOCH_INVALIDATED)) =
storage.bubblegum_epochs.get(epoch_key)
{
// TODO: Means more changes for the tree have come while the epoch chechsum calculation was in process.
// How to handle?
}
}
current_tree = Some(change_key.tree_pubkey);

let new_epoch_key = BubblegumEpochKey::new(current_tree.unwrap(), target_epoch);
storage
.bubblegum_epochs
.put(new_epoch_key, BUBBLEGUM_EPOCH_CALCULATING);

hasher = Hasher::default();
}
hasher.hash(&k);
hasher.hash(&v);
}

if let Some(current_tree) = current_tree {
let epoch_key = BubblegumEpochKey {
tree_pubkey: current_tree,
epoch_num: target_epoch,
};
let epoch_val = BubblegumEpoch::from(hasher.result().to_bytes());
let _ = storage.bubblegum_epochs.merge(epoch_key, epoch_val).await;
}
}

async fn calc_bubblegum_grand_epoch(storage: &Storage, target_grand_epoch: u16) {
let mut current_tree: Option<Pubkey> = None;

let mut it = storage
.bubblegum_epochs
.iter(BubblegumEpochKey::grand_epoch_start_key(target_grand_epoch));
let mut hasher = Hasher::default();

while let Some(Ok((k, v))) = it.next() {
let Ok(epoch_key) = BubblegumEpoch::decode_key(k.to_vec()) else {
continue;
};
let element_grand_epoch = grand_epoch_of_epoch(epoch_key.epoch_num);
if element_grand_epoch > target_grand_epoch {
break;
}
if current_tree != Some(epoch_key.tree_pubkey) {
if current_tree.is_some() {
// write checksum for previous tree
let grand_epoch_key =
BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch);
let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes());
let _ = storage
.bubblegum_grand_epochs
.merge(grand_epoch_key.clone(), grand_epoch_val)
.await;

if let Ok(Some(storage_consistency::BUBBLEGUM_GRAND_EPOCH_INVALIDATED)) =
storage.bubblegum_grand_epochs.get(grand_epoch_key)
{
// TODO: ???
}
}
current_tree = Some(epoch_key.tree_pubkey);

let new_grand_epoch_key =
BubblegumGrandEpochKey::new(current_tree.unwrap(), target_grand_epoch);
let _ = storage
.bubblegum_grand_epochs
.put(new_grand_epoch_key, BUBBLEGUM_GRAND_EPOCH_CALCULATING);

hasher = Hasher::default();
}
hasher.hash(&k);
hasher.hash(&v);
}

if let Some(current_tree) = current_tree {
let grand_epoch_key = BubblegumGrandEpochKey {
tree_pubkey: current_tree,
grand_epoch_num: target_grand_epoch,
};
let grand_epoch_val = BubblegumGrandEpoch::from(hasher.result().to_bytes());
let _ = storage
.bubblegum_grand_epochs
.merge(grand_epoch_key, grand_epoch_val)
.await;
}
}
20 changes: 19 additions & 1 deletion nft_ingester/src/fork_cleaner.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
use entities::models::ForkedItem;
use interface::fork_cleaner::{CompressedTreeChangesManager, ForkChecker};
use metrics_utils::ForkCleanerMetricsConfig;
use rocks_db::storage_consistency::BubblegumChangeKey;
use rocks_db::storage_consistency::DataConsistencyStorage;
use rocks_db::Storage;
use solana_sdk::pubkey::Pubkey;
use solana_sdk::signature::Signature;
Expand Down Expand Up @@ -46,6 +48,7 @@ where
{
cl_items_manager: Arc<CM>,
fork_checker: Arc<FC>,
data_consistency_storage: Arc<dyn DataConsistencyStorage + Send + Sync>,
metrics: Arc<ForkCleanerMetricsConfig>,
}

Expand All @@ -57,11 +60,13 @@ where
pub fn new(
cl_items_manager: Arc<CM>,
fork_checker: Arc<FC>,
data_consistency_storage: Arc<dyn DataConsistencyStorage + Send + Sync>,
metrics: Arc<ForkCleanerMetricsConfig>,
) -> Self {
Self {
cl_items_manager,
fork_checker,
data_consistency_storage,
metrics,
}
}
Expand All @@ -75,6 +80,7 @@ where

let mut forked_slots = 0;
let mut delete_items = Vec::new();
let mut changes_to_delete = Vec::new();

let mut signatures_to_drop = Vec::new();

Expand Down Expand Up @@ -133,7 +139,7 @@ where
// dropping only sequence 5 would result in an incorrect update during backfill.
// therefore, we need to drop sequence 4 as well. Sequence 5 must be dropped because
// it contains a different tree update in the main branch
for sequences in signature.slot_sequences.values() {
for (slot, sequences) in signature.slot_sequences.iter() {
for seq in sequences {
delete_items.push(ForkedItem {
tree: signature.tree,
Expand All @@ -142,6 +148,12 @@ where
// because deletion will happen by tree and seq values
node_idx: 0,
});

changes_to_delete.push(BubblegumChangeKey::new(
signature.tree,
*slot,
*seq,
));
}
}
}
Expand All @@ -150,11 +162,17 @@ where
}

if delete_items.len() >= CI_ITEMS_DELETE_BATCH_SIZE {
self.data_consistency_storage
.drop_forked_bubblegum_changes(&changes_to_delete)
.await;
self.delete_tree_seq_idx(&mut delete_items).await;
}
}

if !delete_items.is_empty() {
self.data_consistency_storage
.drop_forked_bubblegum_changes(&changes_to_delete)
.await;
self.delete_tree_seq_idx(&mut delete_items).await;
}

Expand Down
2 changes: 2 additions & 0 deletions nft_ingester/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod batch_mint;
pub mod bubblegum_updates_processor;
pub mod buffer;
pub mod config;
pub mod consistency_calculator;
pub mod error;
pub mod flatbuffer_mapper;
pub mod fork_cleaner;
Expand All @@ -27,6 +28,7 @@ pub mod redis_receiver;
pub mod rocks_db;
pub mod scheduler;
pub mod sequence_consistent;
pub mod solana_slot_service;
pub mod tcp_receiver;
pub mod token_updates_processor;
pub mod transaction_ingester;
Expand Down
50 changes: 50 additions & 0 deletions nft_ingester/src/solana_slot_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::{sync::Arc, time::Duration};

use rocks_db::storage_consistency::{
current_estimated_epoch, epoch_of_slot, slots_to_next_epoch, update_estimated_epoch,
};
use solana_client::nonblocking::rpc_client::RpcClient;

use crate::consistency_calculator::ConsistencyCalcMsg;

const SLOT_TIME: u64 = 400;

/// Background job that monitors current solana slot number,
/// and notifies downstream components about epoch change.
/// This component is an __optional__ part of p2p consistency checking mechanis.
pub struct SolanaSlotService {
rpc_client: Arc<RpcClient>,
epoch_changed_notifier: tokio::sync::mpsc::UnboundedSender<ConsistencyCalcMsg>,
}

const MIN_LAG_TO_CHECK_SLOT: u64 = 60;

impl SolanaSlotService {
async fn run_epoch_changed_notifier(&self) {
loop {
let estimated_epoch = current_estimated_epoch();

let Ok(solana_slot) = self.rpc_client.get_slot().await else {
tokio::time::sleep(Duration::from_secs(MIN_LAG_TO_CHECK_SLOT)).await;
continue;
};
let current_epoch = epoch_of_slot(solana_slot);

if current_epoch > estimated_epoch {
update_estimated_epoch(current_epoch);
let _ = self
.epoch_changed_notifier
.send(ConsistencyCalcMsg::EpochChanged {
epoch: current_epoch,
});
} else {
let wait_more_slots = slots_to_next_epoch(solana_slot);
let mut seconds_to_wait = (wait_more_slots * SLOT_TIME) / 1000;
if seconds_to_wait < MIN_LAG_TO_CHECK_SLOT {
seconds_to_wait += MIN_LAG_TO_CHECK_SLOT;
}
tokio::time::sleep(Duration::from_secs(seconds_to_wait)).await;
}
}
}
}
2 changes: 2 additions & 0 deletions nft_ingester/tests/clean_forks_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -641,6 +641,7 @@ async fn test_clean_forks() {
let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
let rx = shutdown_rx.resubscribe();
let fork_cleaner = ForkCleaner::new(
storage.clone(),
storage.clone(),
storage.clone(),
metrics_state.fork_cleaner_metrics.clone(),
Expand Down Expand Up @@ -970,6 +971,7 @@ async fn test_process_forked_transaction() {
let (_shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);

let fork_cleaner = ForkCleaner::new(
storage.clone(),
storage.clone(),
storage.clone(),
metrics_state.fork_cleaner_metrics.clone(),
Expand Down
Loading

0 comments on commit 0c84d5c

Please sign in to comment.