diff --git a/Cargo.lock b/Cargo.lock index 7e615708..1c85a161 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13915,6 +13915,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum", + "blockstore", "cid 0.11.1", "clap", "ed25519-dalek", @@ -13925,8 +13926,10 @@ dependencies = [ "libp2p 0.54.1", "mater", "parity-scale-codec", + "polka-index", "polka-storage-proofs", "polka-storage-provider-common", + "polka-storage-retrieval", "primitives", "rand", "rocksdb", diff --git a/storage-provider/server/Cargo.toml b/storage-provider/server/Cargo.toml index 820d185d..25475212 100644 --- a/storage-provider/server/Cargo.toml +++ b/storage-provider/server/Cargo.toml @@ -14,13 +14,16 @@ delia = [] [dependencies] # "Homegrown" crates mater = { workspace = true } +polka-index = { workspace = true } polka-storage-proofs = { workspace = true, features = ["std", "substrate"] } polka-storage-provider-common = { workspace = true } +polka-storage-retrieval = { workspace = true } primitives = { workspace = true, features = ["clap", "serde", "std"] } storagext = { workspace = true, features = ["clap"] } async-trait = { workspace = true } axum = { workspace = true, features = ["macros", "multipart"] } +blockstore = { workspace = true } cid = { workspace = true, features = ["serde", "std"] } clap = { workspace = true, features = ["derive"] } codec = { workspace = true } diff --git a/storage-provider/server/src/config.rs b/storage-provider/server/src/config.rs index 9b72ae5c..35b60155 100644 --- a/storage-provider/server/src/config.rs +++ b/storage-provider/server/src/config.rs @@ -35,6 +35,11 @@ fn default_node_address() -> Url { Url::parse(DEFAULT_NODE_ADDRESS).expect("DEFAULT_NODE_ADDRESS must be a valid Url") } +fn default_retrieval_address() -> Multiaddr { + format!("/ip4/127.0.0.1/tcp/8002") + .parse() + .expect("multiaddres is correct") +} #[derive(Debug, Clone, Deserialize, Args)] #[group(multiple = true, conflicts_with = "config")] #[serde(deny_unknown_fields)] @@ -54,6 +59,11 @@ pub struct ConfigurationArgs { #[arg(long, default_value_t = default_node_address())] pub(crate) node_url: Url, + /// Storage provider listen address. + #[serde(default = "default_retrieval_address")] + #[arg(long, default_value_t = default_retrieval_address())] + pub(crate) retrieval_listen_address: Multiaddr, + /// RocksDB storage directory. /// Defaults to a temporary random directory, like `/tmp//deals_database`. #[arg(long)] diff --git a/storage-provider/server/src/main.rs b/storage-provider/server/src/main.rs index 388d841a..4bc856c4 100644 --- a/storage-provider/server/src/main.rs +++ b/storage-provider/server/src/main.rs @@ -6,10 +6,11 @@ mod config; mod db; mod p2p; mod pipeline; +mod retrieval; mod rpc; mod storage; -use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration}; +use std::{env::temp_dir, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc, time::Duration}; use clap::Parser; use libp2p::{identity::Keypair, Multiaddr, PeerId}; @@ -18,6 +19,7 @@ use p2p::{ RegisterConfig, }; use pipeline::types::PipelineMessage; +use polka_index::local_index_directory::rdb::{RocksDBLid, RocksDBStateStoreConfig}; use polka_storage_proofs::{ porep::{self, PoRepParameters}, post::{self, PoStParameters}, @@ -25,6 +27,7 @@ use polka_storage_proofs::{ use polka_storage_provider_common::rpc::ServerInfo; use primitives::proofs::{RegisteredPoStProof, RegisteredSealProof}; use rand::Rng; +use retrieval::{start_retrieval, RetrievalServerConfig}; use storagext::{ multipair::{MultiPairArgs, MultiPairSigner}, runtime::runtime_types::{ @@ -72,6 +75,9 @@ pub(crate) const SEALED_SECTOR_DIRECTORY_NAME: &str = "sealed"; /// Name for the directory where the sealing cache is kept. pub(crate) const SEALING_CACHE_DIRECTORY_NANE: &str = "cache"; +/// Name of the directory where the index is kept. +pub(crate) const INDEXER_DIRECTORY_NAME: &str = "index"; + fn get_random_temporary_folder() -> PathBuf { temp_dir().join( rand::thread_rng() @@ -88,6 +94,7 @@ struct SetupOutput { pipeline_state: PipelineState, pipeline_rx: UnboundedReceiver, p2p_state: P2PState, + retrieval_config: RetrievalServerConfig, } fn main() -> Result<(), ServerError> { @@ -176,6 +183,12 @@ pub enum ServerError { #[error(transparent)] P2P(#[from] P2PError), + + #[error(transparent)] + RetrievalServer(#[from] polka_storage_retrieval::server::ServerError), + + #[error(transparent)] + Lid(#[from] polka_index::local_index_directory::LidError), } /// Takes an expression that returns a `Result, E1>`. @@ -232,6 +245,9 @@ pub struct Server { /// Parachain node RPC url. node_url: Url, + /// Storage provider listen address. + retrieval_listen_address: Multiaddr, + /// Storage provider key pair. multi_pair_signer: MultiPairSigner, @@ -353,6 +369,7 @@ impl TryFrom for Server { p2p_key: args.p2p_key, rendezvous_point_address: args.rendezvous_point_address, rendezvous_point: args.rendezvous_point, + retrieval_listen_address: args.retrieval_listen_address, }) } } @@ -365,6 +382,7 @@ impl Server { pipeline_state, pipeline_rx, p2p_state, + retrieval_config, } = self.setup().await?; let cancellation_token = CancellationToken::new(); @@ -384,6 +402,11 @@ impl Server { cancellation_token.child_token(), )); + let retrieval_task = tokio::spawn(start_retrieval( + retrieval_config, + cancellation_token.child_token(), + )); + // Wait for SIGTERM on the main thread and once received "unblock" tokio::signal::ctrl_c() .await @@ -394,8 +417,13 @@ impl Server { tracing::info!("sent shutdown signal"); // Wait for the tasks to finish - let (upload_result, rpc_task, pipeline_task, p2p_task) = - tokio::join!(storage_task, rpc_task, pipeline_task, p2p_task); + let (upload_result, rpc_task, pipeline_task, p2p_task, retrieval_task) = tokio::join!( + storage_task, + rpc_task, + pipeline_task, + p2p_task, + retrieval_task + ); // Inspect and log errors let (upload_result, rpc_task, pipeline_task, p2p_task) = @@ -406,6 +434,7 @@ impl Server { rpc_task??; pipeline_task??; p2p_task??; + retrieval_task??; Ok(()) } @@ -427,12 +456,19 @@ impl Server { let sealed_sector_storage_dir = Arc::new(self.storage_directory.join(SEALED_SECTOR_DIRECTORY_NAME)); let sealing_cache_dir = Arc::new(self.storage_directory.join(SEALING_CACHE_DIRECTORY_NANE)); + let index_dir = Arc::new(self.storage_directory.join(INDEXER_DIRECTORY_NAME)); // Create the storage directories tokio::fs::create_dir_all(car_piece_storage_dir.as_ref()).await?; tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?; tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?; tokio::fs::create_dir_all(sealing_cache_dir.as_ref()).await?; + tokio::fs::create_dir_all(index_dir.as_ref()).await?; + + // Indexer used by the system + let indexer = Arc::new(RocksDBLid::new(RocksDBStateStoreConfig { + path: index_dir.deref().clone(), + })?); let (pipeline_tx, pipeline_rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -465,7 +501,7 @@ impl Server { let pipeline_state = PipelineState { db: deal_database.clone(), server_info: rpc_state.server_info.clone(), - unsealed_sectors_dir: unsealed_sector_storage_dir, + unsealed_sectors_dir: unsealed_sector_storage_dir.clone(), sealed_sectors_dir: sealed_sector_storage_dir, sealing_cache_dir, porep_parameters: Arc::new(self.porep_parameters), @@ -483,12 +519,20 @@ impl Server { rendezvous_point: self.rendezvous_point, }; + let unsealed_sectors_dir = unsealed_sector_storage_dir.deref().clone(); + let retrieval_config = RetrievalServerConfig { + listen_address: self.retrieval_listen_address, + unsealed_sectors_dir, + indexer, + }; + Ok(SetupOutput { storage_state, rpc_state, pipeline_state, pipeline_rx, p2p_state, + retrieval_config, }) } diff --git a/storage-provider/server/src/retrieval/blockstore.rs b/storage-provider/server/src/retrieval/blockstore.rs new file mode 100644 index 00000000..b3edca9a --- /dev/null +++ b/storage-provider/server/src/retrieval/blockstore.rs @@ -0,0 +1,54 @@ +use std::{path::Path, sync::Arc}; + +use blockstore::Error; +use polka_index::local_index_directory::Service; + +/// The blockstore that reads blocks directly from unsealed sectors +pub struct ProviderBlockstore { + indexer: Arc, +} + +impl ProviderBlockstore { + pub fn new

(unsealed_sectors_path: P, indexer: Arc) -> Self + where + I: Service, + P: AsRef, + { + Self { indexer } + } +} + +impl blockstore::Blockstore for ProviderBlockstore +where + I: Send + Sync + 'static, +{ + async fn get( + &self, + _cid: &cid::CidGeneric, + ) -> blockstore::Result>> { + // TODO: Find the sector that holds the requested cid + // TODO: open a file handler to the sector file. + // TODO: Read the blocks from the sector + todo!() + } + + async fn put_keyed( + &self, + _cid: &cid::CidGeneric, + _data: &[u8], + ) -> blockstore::Result<()> { + Err(Error::FatalDatabaseError( + "put operation not supported".to_string(), + )) + } + + async fn remove(&self, _cid: &cid::CidGeneric) -> blockstore::Result<()> { + Err(Error::FatalDatabaseError( + "remove operation not supported".to_string(), + )) + } + + async fn close(self) -> blockstore::Result<()> { + Ok(()) + } +} diff --git a/storage-provider/server/src/retrieval/mod.rs b/storage-provider/server/src/retrieval/mod.rs new file mode 100644 index 00000000..5a758144 --- /dev/null +++ b/storage-provider/server/src/retrieval/mod.rs @@ -0,0 +1,43 @@ +use std::{path::PathBuf, sync::Arc}; + +use blockstore::ProviderBlockstore; +use libp2p::Multiaddr; +use polka_index::local_index_directory::Service; +use polka_storage_retrieval::Server; +use tokio_util::sync::CancellationToken; + +use crate::ServerError; + +mod blockstore; + +pub struct RetrievalServerConfig { + pub listen_address: Multiaddr, + pub unsealed_sectors_dir: PathBuf, + pub indexer: Arc, +} + +#[tracing::instrument(skip_all)] +pub async fn start_retrieval( + config: RetrievalServerConfig, + token: CancellationToken, +) -> Result<(), ServerError> +where + I: Service + Send + Sync + 'static, +{ + // Blockstore used by the retrieval server provider + let blockstore = Arc::new(ProviderBlockstore::new( + config.unsealed_sectors_dir, + config.indexer, + )); + + // Setup & run the retrieval server + let server = Server::new(blockstore)?; + server + .run(vec![config.listen_address], async move { + token.cancelled_owned().await; + tracing::trace!("shutting down the retrieval server"); + }) + .await?; + + Ok(()) +} diff --git a/storage-retrieval/lib/examples/simple_server.rs b/storage-retrieval/lib/examples/simple_server.rs index 0ad42df6..bcec600f 100644 --- a/storage-retrieval/lib/examples/simple_server.rs +++ b/storage-retrieval/lib/examples/simple_server.rs @@ -23,7 +23,7 @@ async fn main() -> Result<()> { // Setup & run the server let server = Server::new(blockstore)?; let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?; - server.run(vec![listener]).await?; + server.run(vec![listener], std::future::pending()).await?; Ok(()) } diff --git a/storage-retrieval/lib/src/server.rs b/storage-retrieval/lib/src/server.rs index 7295432b..3ce52063 100644 --- a/storage-retrieval/lib/src/server.rs +++ b/storage-retrieval/lib/src/server.rs @@ -1,7 +1,7 @@ use std::{io, sync::Arc}; use blockstore::Blockstore; -use futures::StreamExt; +use futures::{pin_mut, Future, StreamExt}; use libp2p::{Multiaddr, PeerId, Swarm, TransportError}; use libp2p_core::ConnectedPoint; use libp2p_swarm::{ConnectionId, SwarmEvent}; @@ -43,17 +43,35 @@ where // Start the server. The server will stop if it received a cancellation // event or some error occurred. - pub async fn run(mut self, listeners: Vec) -> Result<(), ServerError> { + pub async fn run( + mut self, + listeners: Vec, + shutdown_signal: F, + ) -> Result<(), ServerError> + where + F: Future + Send + 'static, + { // Listen on for listener in listeners { self.swarm.listen_on(listener)?; } - // Keep server running + pin_mut!(shutdown_signal); + loop { - let event = self.swarm.select_next_some().await; - self.on_swarm_event(event)?; + tokio::select! { + event = self.swarm.select_next_some() => { + self.on_swarm_event(event)?; + + } + _ = &mut shutdown_signal => { + trace!("received shutdown signal"); + break; + } + } } + + Ok(()) } fn on_swarm_event(&mut self, event: SwarmEvent>) -> Result<(), ServerError> {