Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MTG-777] Switching between storage instances #295

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
3 changes: 2 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ services:
network_mode: host
volumes:
- ${API_ROCKS_DB_PATH_CONTAINER}:${API_ROCKS_DB_PATH_CONTAINER}:ro
- ${API_ROCKS_DB_SECONDARY_PATH_CONTAINER}:${API_ROCKS_DB_SECONDARY_PATH_CONTAINER}:rw
- ${API_ROCKS_DB_SECONDARY_FIRST_PATH_CONTAINER}:${API_ROCKS_DB_SECONDARY_FIRST_PATH_CONTAINER}:rw
- ${API_ROCKS_DB_SECONDARY_SECOND_PATH_CONTAINER}:${API_ROCKS_DB_SECONDARY_SECOND_PATH_CONTAINER}:rw
- ${API_ARCHIVES_DIR}:${API_ARCHIVES_DIR}:ro
- ${API_FILE_STORAGE_PATH}:${API_FILE_STORAGE_PATH_CONTAINER}:rw
- ./heaps:/usr/src/app/heaps:rw
Expand Down
9 changes: 7 additions & 2 deletions nft_ingester/benches/integrated_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use criterion::{criterion_group, criterion_main, Criterion};
use entities::api_req_params::SearchAssets;
use nft_ingester::{api::middleware::JsonDownloaderMiddleware, index_syncronizer::Synchronizer};
use nft_ingester::{
api::middleware::JsonDownloaderMiddleware, index_syncronizer::Synchronizer,
rocks_db::RocksDbManager,
};
use rocks_db::storage_traits::AssetIndexReader;
use setup::TestEnvironment;
use std::sync::Arc;
Expand Down Expand Up @@ -35,9 +38,11 @@ fn search_assets_benchmark(c: &mut Criterion) {
let limit: u32 = 1000; // Number of records to fetch
let rt = tokio::runtime::Runtime::new().unwrap();
let (env, _generated_assets) = rt.block_on(setup_environment(&cli));
let rocks_db = RocksDbManager::new_primary(env.rocks_env.storage.clone()).into();

let api = nft_ingester::api::api_impl::DasApi::new(
env.pg_env.client.clone(),
env.rocks_env.storage.clone(),
rocks_db,
Arc::new(ApiMetricsConfig::new()),
None,
100,
Expand Down
27 changes: 14 additions & 13 deletions nft_ingester/src/api/api_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::api::dapi::rpc_asset_models::Asset;
use crate::api::error::DasApiError;
use crate::api::*;
use crate::config::JsonMiddlewareConfig;
use crate::rocks_db::RocksDbManager;
use dapi::get_asset_signatures::get_asset_signatures;
use dapi::get_core_fees::get_core_fees;
use dapi::get_token_accounts::get_token_accounts;
Expand Down Expand Up @@ -47,7 +48,7 @@ where
PPC: ProcessingPossibilityChecker + Sync + Send + 'static,
{
pub(crate) pg_client: Arc<PgClient>,
rocks_db: Arc<Storage>,
rocks_db: Arc<RocksDbManager>,
metrics: Arc<ApiMetricsConfig>,
proof_checker: Option<Arc<PC>>,
tree_gaps_checker: Option<Arc<PPC>>,
Expand Down Expand Up @@ -77,7 +78,7 @@ where
#[allow(clippy::too_many_arguments)]
pub fn new(
pg_client: Arc<PgClient>,
rocks_db: Arc<Storage>,
rocks_db: Arc<RocksDbManager>,
metrics: Arc<ApiMetricsConfig>,
proof_checker: Option<Arc<PC>>,
tree_gaps_checker: Option<Arc<PPC>>,
Expand Down Expand Up @@ -244,7 +245,7 @@ where

let id = validate_pubkey(payload.id.clone())?;
let assets = get_proof_for_assets(
self.rocks_db.clone(),
self.rocks_db.acquire(),
vec![id],
self.proof_checker.clone(),
&self.tree_gaps_checker,
Expand Down Expand Up @@ -284,7 +285,7 @@ where
.collect::<Result<Vec<_>, _>>()?;

let res = get_proof_for_assets(
self.rocks_db.clone(),
self.rocks_db.acquire(),
ids,
self.proof_checker.clone(),
&self.tree_gaps_checker,
Expand All @@ -311,7 +312,7 @@ where
let options = payload.options.unwrap_or_default();

let res = get_asset(
self.rocks_db.clone(),
self.rocks_db.acquire(),
id,
options,
self.json_downloader.clone(),
Expand Down Expand Up @@ -359,7 +360,7 @@ where
let options = payload.options.unwrap_or_default();

let res = get_asset_batch(
self.rocks_db.clone(),
self.rocks_db.acquire(),
ids,
options,
self.json_downloader.clone(),
Expand Down Expand Up @@ -391,7 +392,7 @@ where
let res = self
.process_request(
self.pg_client.clone(),
self.rocks_db.clone(),
self.rocks_db.acquire(),
payload,
tasks,
)
Expand All @@ -415,7 +416,7 @@ where
let res = self
.process_request(
self.pg_client.clone(),
self.rocks_db.clone(),
self.rocks_db.acquire(),
payload,
tasks,
)
Expand All @@ -439,7 +440,7 @@ where
let res = self
.process_request(
self.pg_client.clone(),
self.rocks_db.clone(),
self.rocks_db.acquire(),
payload,
tasks,
)
Expand All @@ -463,7 +464,7 @@ where
let res = self
.process_request(
self.pg_client.clone(),
self.rocks_db.clone(),
self.rocks_db.acquire(),
payload,
tasks,
)
Expand Down Expand Up @@ -514,7 +515,7 @@ where
}

let res = get_token_accounts(
self.rocks_db.clone(),
self.rocks_db.acquire(),
owner,
mint,
limit.unwrap_or(DEFAULT_LIMIT as u32).into(),
Expand Down Expand Up @@ -583,7 +584,7 @@ where
let res = self
.process_request(
self.pg_client.clone(),
self.rocks_db.clone(),
self.rocks_db.acquire(),
payload,
tasks,
)
Expand Down Expand Up @@ -639,7 +640,7 @@ where
Self::validate_basic_pagination(&pagination, self.max_page_limit)?;

let res = get_asset_signatures(
self.rocks_db.clone(),
self.rocks_db.acquire(),
id,
tree,
leaf_index,
Expand Down
7 changes: 4 additions & 3 deletions nft_ingester/src/api/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use usecase::proofs::MaybeProofChecker;
use uuid::Uuid;

use crate::api::backfilling_state_consistency::BackfillingStateConsistencyChecker;
use crate::rocks_db::RocksDbManager;
use interface::consistency_check::ConsistencyChecker;
use metrics_utils::ApiMetricsConfig;
use rocks_db::Storage;
Expand Down Expand Up @@ -50,7 +51,7 @@ pub(crate) struct MiddlewaresData {
#[allow(clippy::too_many_arguments)]
pub async fn start_api(
pg_client: Arc<PgClient>,
rocks_db: Arc<Storage>,
rocks_db: Arc<RocksDbManager>,
rx: Receiver<()>,
metrics: Arc<ApiMetricsConfig>,
port: u16,
Expand Down Expand Up @@ -84,7 +85,7 @@ pub async fn start_api(
tasks.clone(),
rx.resubscribe(),
pg_client.clone(),
rocks_db.clone(),
rocks_db.acquire(),
consistence_synchronization_api_threshold,
)
.await;
Expand All @@ -98,7 +99,7 @@ pub async fn start_api(
.run(
tasks.clone(),
rx.resubscribe(),
rocks_db.clone(),
rocks_db.acquire(),
consistence_backfilling_slots_threshold,
)
.await;
Expand Down
56 changes: 33 additions & 23 deletions nft_ingester/src/bin/api/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use nft_ingester::config::{init_logger, setup_config, ApiConfig};
use nft_ingester::error::IngesterError;
use nft_ingester::init::graceful_stop;
use nft_ingester::json_worker::JsonWorker;
use nft_ingester::rocks_db::RocksDbManager;
use prometheus_client::registry::Registry;
use tracing::{error, info};

Expand All @@ -26,7 +27,8 @@ use usecase::proofs::MaybeProofChecker;
static GLOBAL: jemallocator::Jemalloc = jemallocator::Jemalloc;

pub const DEFAULT_ROCKSDB_PATH: &str = "./my_rocksdb";
pub const DEFAULT_SECONDARY_ROCKSDB_PATH: &str = "./my_rocksdb_secondary";
pub const DEFAULT_SECONDARY_FIRST_ROCKSDB_PATH_CONTAINER: &str = "./my_rocksdb_secondary_first";
pub const DEFAULT_SECONDARY_SECOND_ROCKSDB_PATH_CONTAINER: &str = "./my_rocksdb_secondary_second";

#[tokio::main(flavor = "multi_thread")]
pub async fn main() -> Result<(), IngesterError> {
Expand Down Expand Up @@ -84,24 +86,39 @@ pub async fn main() -> Result<(), IngesterError> {
.rocks_db_path_container
.clone()
.unwrap_or(DEFAULT_ROCKSDB_PATH.to_string());
let secondary_storage_path = config
.rocks_db_secondary_path_container

let secondary_storage_first_path = config
.rocks_db_secondary_first_path_container
.clone()
.unwrap_or(DEFAULT_SECONDARY_ROCKSDB_PATH.to_string());
let storage = Storage::open_secondary(
.unwrap_or(DEFAULT_SECONDARY_FIRST_ROCKSDB_PATH_CONTAINER.to_string());
let secondary_storage_first = Storage::open_secondary(
&primary_storage_path,
&secondary_storage_path,
&secondary_storage_first_path,
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();

let rocks_storage = Arc::new(storage);
let secondary_storage_second_path = config
.rocks_db_secondary_first_path_container
.clone()
.unwrap_or(DEFAULT_SECONDARY_SECOND_ROCKSDB_PATH_CONTAINER.to_string());
let secondary_storage_second_path = Storage::open_secondary(
&primary_storage_path,
&secondary_storage_second_path,
mutexed_tasks.clone(),
red_metrics.clone(),
MigrationState::Last,
)
.unwrap();
let rocks_db_manager = Arc::new(RocksDbManager::new_secondary(
secondary_storage_first,
secondary_storage_second_path,
));

let rpc_client = Arc::new(RpcClient::new(config.rpc_host));
let account_balance_getter = Arc::new(AccountBalanceGetterImpl::new(rpc_client.clone()));
let cloned_rocks_storage = rocks_storage.clone();
let mut proof_checker = None;
if config.check_proofs {
proof_checker = Some(Arc::new(MaybeProofChecker::new(
Expand All @@ -117,7 +134,7 @@ pub async fn main() -> Result<(), IngesterError> {
Some(Arc::new(
JsonWorker::new(
pg_client.clone(),
rocks_storage.clone(),
rocks_db_manager.clone(),
json_downloader_metrics.clone(),
)
.await,
Expand All @@ -136,17 +153,18 @@ pub async fn main() -> Result<(), IngesterError> {
if config.skip_check_tree_gaps {
None
} else {
Some(cloned_rocks_storage.clone())
Some(rocks_db_manager.acquire())
}
};

let (shutdown_tx, shutdown_rx) = broadcast::channel::<()>(1);
let cloned_tasks = mutexed_tasks.clone();
let cloned_rx = shutdown_rx.resubscribe();
let rocks_db_manager_clone = rocks_db_manager.clone();
mutexed_tasks.lock().await.spawn(async move {
match start_api(
pg_client.clone(),
cloned_rocks_storage.clone(),
rocks_db_manager_clone,
cloned_rx,
metrics.clone(),
config.server_port,
Expand Down Expand Up @@ -179,16 +197,16 @@ pub async fn main() -> Result<(), IngesterError> {
// setup dependencies for grpc server
let uc = usecase::asset_streamer::AssetStreamer::new(
config.peer_grpc_max_gap_slots,
rocks_storage.clone(),
rocks_db_manager.acquire(),
);
let bs = usecase::raw_blocks_streamer::BlocksStreamer::new(
config.peer_grpc_max_gap_slots,
rocks_storage.clone(),
rocks_db_manager.acquire(),
);
let serv = grpc::service::PeerGapFillerServiceImpl::new(
Arc::new(uc),
Arc::new(bs),
rocks_storage.clone(),
rocks_db_manager.acquire(),
);
let addr = format!("0.0.0.0:{}", config.peer_grpc_port).parse()?;
let mut cloned_rx = shutdown_rx.resubscribe();
Expand All @@ -206,17 +224,9 @@ pub async fn main() -> Result<(), IngesterError> {
Ok(())
});

// try synchronizing secondary rocksdb instance every config.rocks_sync_interval_seconds
let cloned_rx = shutdown_rx.resubscribe();
let cloned_rocks_storage = rocks_storage.clone();
let dur = tokio::time::Duration::from_secs(config.rocks_sync_interval_seconds);
mutexed_tasks.lock().await.spawn(async move {
while cloned_rx.is_empty() {
if let Err(e) = cloned_rocks_storage.db.try_catch_up_with_primary() {
error!("Sync rocksdb error: {}", e);
}
tokio::time::sleep(dur).await;
}
rocks_db_manager.catch_up(cloned_rx).await;

Ok(())
});
Expand Down
Loading
Loading