Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate retrieval server #688

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions storage-provider/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,17 @@ delia = []
[dependencies]
# "Homegrown" crates
mater = { workspace = true }
polka-index = { workspace = true }
polka-storage-proofs = { workspace = true, features = ["std", "substrate"] }
polka-storage-provider-common = { workspace = true }
polka-storage-retrieval = { workspace = true }
primitives = { workspace = true, features = ["clap", "serde", "std"] }
storagext = { workspace = true, features = ["clap"] }

async-trait = { workspace = true }
axum = { workspace = true, features = ["macros", "multipart"] }
base64 = { workspace = true }
blockstore = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
ciborium = { workspace = true }
cid = { workspace = true, features = ["serde", "std"] }
Expand Down
10 changes: 10 additions & 0 deletions storage-provider/server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@ fn default_node_address() -> Url {
Url::parse(DEFAULT_NODE_ADDRESS).expect("DEFAULT_NODE_ADDRESS must be a valid Url")
}

fn default_retrieval_address() -> Multiaddr {
format!("/ip4/127.0.0.1/tcp/8002")
.parse()
.expect("multiaddres is correct")
}
#[derive(Debug, Clone, Deserialize, Args)]
#[group(multiple = true, conflicts_with = "config")]
#[serde(deny_unknown_fields)]
Expand All @@ -54,6 +59,11 @@ pub struct ConfigurationArgs {
#[arg(long, default_value_t = default_node_address())]
pub(crate) node_url: Url,

/// Storage provider listen address.
#[serde(default = "default_retrieval_address")]
#[arg(long, default_value_t = default_retrieval_address())]
pub(crate) retrieval_listen_address: Multiaddr,

/// RocksDB storage directory.
/// Defaults to a temporary random directory, like `/tmp/<random>/deals_database`.
#[arg(long)]
Expand Down
52 changes: 48 additions & 4 deletions storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ mod db;
mod local_index_directory;
mod p2p;
mod pipeline;
mod retrieval;
mod rpc;
mod storage;

use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};
use std::{env::temp_dir, net::SocketAddr, ops::Deref, path::PathBuf, sync::Arc, time::Duration};

use clap::Parser;
use libp2p::{identity::Keypair, Multiaddr, PeerId};
Expand All @@ -19,13 +20,15 @@ use p2p::{
RegisterConfig,
};
use pipeline::types::PipelineMessage;
use polka_index::local_index_directory::rdb::{RocksDBLid, RocksDBStateStoreConfig};
use polka_storage_proofs::{
porep::{self, PoRepParameters},
post::{self, PoStParameters},
};
use polka_storage_provider_common::rpc::ServerInfo;
use primitives::proofs::{RegisteredPoStProof, RegisteredSealProof};
use rand::Rng;
use retrieval::{start_retrieval, RetrievalServerConfig};
use storagext::{
multipair::{MultiPairArgs, MultiPairSigner},
runtime::runtime_types::{
Expand Down Expand Up @@ -73,6 +76,9 @@ pub(crate) const SEALED_SECTOR_DIRECTORY_NAME: &str = "sealed";
/// Name for the directory where the sealing cache is kept.
pub(crate) const SEALING_CACHE_DIRECTORY_NANE: &str = "cache";

/// Name of the directory where the index is kept.
pub(crate) const INDEXER_DIRECTORY_NAME: &str = "index";

fn get_random_temporary_folder() -> PathBuf {
temp_dir().join(
rand::thread_rng()
Expand All @@ -89,6 +95,7 @@ struct SetupOutput {
pipeline_state: PipelineState,
pipeline_rx: UnboundedReceiver<PipelineMessage>,
p2p_state: P2PState,
retrieval_config: RetrievalServerConfig<RocksDBLid>,
}

fn main() -> Result<(), ServerError> {
Expand Down Expand Up @@ -177,6 +184,12 @@ pub enum ServerError {

#[error(transparent)]
P2P(#[from] P2PError),

#[error(transparent)]
RetrievalServer(#[from] polka_storage_retrieval::server::ServerError),

#[error(transparent)]
Lid(#[from] polka_index::local_index_directory::LidError),
}

/// Takes an expression that returns a `Result<Result<T, E2>, E1>`.
Expand Down Expand Up @@ -233,6 +246,9 @@ pub struct Server {
/// Parachain node RPC url.
node_url: Url,

/// Storage provider listen address.
retrieval_listen_address: Multiaddr,

/// Storage provider key pair.
multi_pair_signer: MultiPairSigner,

Expand Down Expand Up @@ -354,6 +370,7 @@ impl TryFrom<ServerCli> for Server {
p2p_key: args.p2p_key,
rendezvous_point_address: args.rendezvous_point_address,
rendezvous_point: args.rendezvous_point,
retrieval_listen_address: args.retrieval_listen_address,
})
}
}
Expand All @@ -366,6 +383,7 @@ impl Server {
pipeline_state,
pipeline_rx,
p2p_state,
retrieval_config,
} = self.setup().await?;

let cancellation_token = CancellationToken::new();
Expand All @@ -385,6 +403,11 @@ impl Server {
cancellation_token.child_token(),
));

let retrieval_task = tokio::spawn(start_retrieval(
retrieval_config,
cancellation_token.child_token(),
));

// Wait for SIGTERM on the main thread and once received "unblock"
tokio::signal::ctrl_c()
.await
Expand All @@ -395,8 +418,13 @@ impl Server {
tracing::info!("sent shutdown signal");

// Wait for the tasks to finish
let (upload_result, rpc_task, pipeline_task, p2p_task) =
tokio::join!(storage_task, rpc_task, pipeline_task, p2p_task);
let (upload_result, rpc_task, pipeline_task, p2p_task, retrieval_task) = tokio::join!(
storage_task,
rpc_task,
pipeline_task,
p2p_task,
retrieval_task
);

// Inspect and log errors
let (upload_result, rpc_task, pipeline_task, p2p_task) =
Expand All @@ -407,6 +435,7 @@ impl Server {
rpc_task??;
pipeline_task??;
p2p_task??;
retrieval_task??;

Ok(())
}
Expand All @@ -428,12 +457,19 @@ impl Server {
let sealed_sector_storage_dir =
Arc::new(self.storage_directory.join(SEALED_SECTOR_DIRECTORY_NAME));
let sealing_cache_dir = Arc::new(self.storage_directory.join(SEALING_CACHE_DIRECTORY_NANE));
let index_dir = Arc::new(self.storage_directory.join(INDEXER_DIRECTORY_NAME));

// Create the storage directories
tokio::fs::create_dir_all(car_piece_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealing_cache_dir.as_ref()).await?;
tokio::fs::create_dir_all(index_dir.as_ref()).await?;

// Indexer used by the system
let indexer = Arc::new(RocksDBLid::new(RocksDBStateStoreConfig {
path: index_dir.deref().clone(),
})?);

let (pipeline_tx, pipeline_rx) = tokio::sync::mpsc::unbounded_channel::<PipelineMessage>();

Expand Down Expand Up @@ -466,7 +502,7 @@ impl Server {
let pipeline_state = PipelineState {
db: deal_database.clone(),
server_info: rpc_state.server_info.clone(),
unsealed_sectors_dir: unsealed_sector_storage_dir,
unsealed_sectors_dir: unsealed_sector_storage_dir.clone(),
sealed_sectors_dir: sealed_sector_storage_dir,
sealing_cache_dir,
porep_parameters: Arc::new(self.porep_parameters),
Expand All @@ -484,12 +520,20 @@ impl Server {
rendezvous_point: self.rendezvous_point,
};

let unsealed_sectors_dir = unsealed_sector_storage_dir.deref().clone();
let retrieval_config = RetrievalServerConfig {
listen_address: self.retrieval_listen_address,
unsealed_sectors_dir,
indexer,
};

Ok(SetupOutput {
storage_state,
rpc_state,
pipeline_state,
pipeline_rx,
p2p_state,
retrieval_config,
})
}

Expand Down
54 changes: 54 additions & 0 deletions storage-provider/server/src/retrieval/blockstore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{path::Path, sync::Arc};

use blockstore::Error;
use polka_index::local_index_directory::Service;

/// The blockstore that reads blocks directly from unsealed sectors
pub struct ProviderBlockstore<I> {
indexer: Arc<I>,
}

impl<I> ProviderBlockstore<I> {
pub fn new<P>(unsealed_sectors_path: P, indexer: Arc<I>) -> Self
where
I: Service,
P: AsRef<Path>,
{
Self { indexer }
}
}

impl<I> blockstore::Blockstore for ProviderBlockstore<I>
where
I: Send + Sync + 'static,
{
async fn get<const S: usize>(
&self,
_cid: &cid::CidGeneric<S>,
) -> blockstore::Result<Option<Vec<u8>>> {
// TODO: Find the sector that holds the requested cid
// TODO: open a file handler to the sector file.
// TODO: Read the blocks from the sector
todo!()
}

async fn put_keyed<const S: usize>(
&self,
_cid: &cid::CidGeneric<S>,
_data: &[u8],
) -> blockstore::Result<()> {
Err(Error::FatalDatabaseError(
"put operation not supported".to_string(),
))
}

async fn remove<const S: usize>(&self, _cid: &cid::CidGeneric<S>) -> blockstore::Result<()> {
Err(Error::FatalDatabaseError(
"remove operation not supported".to_string(),
))
}

async fn close(self) -> blockstore::Result<()> {
Ok(())
}
}
43 changes: 43 additions & 0 deletions storage-provider/server/src/retrieval/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use std::{path::PathBuf, sync::Arc};

use blockstore::ProviderBlockstore;
use libp2p::Multiaddr;
use polka_index::local_index_directory::Service;
use polka_storage_retrieval::Server;
use tokio_util::sync::CancellationToken;

use crate::ServerError;

mod blockstore;

pub struct RetrievalServerConfig<I> {
pub listen_address: Multiaddr,
pub unsealed_sectors_dir: PathBuf,
pub indexer: Arc<I>,
}

#[tracing::instrument(skip_all)]
pub async fn start_retrieval<I>(
config: RetrievalServerConfig<I>,
token: CancellationToken,
) -> Result<(), ServerError>
where
I: Service + Send + Sync + 'static,
{
// Blockstore used by the retrieval server provider
let blockstore = Arc::new(ProviderBlockstore::new(
config.unsealed_sectors_dir,
config.indexer,
));

// Setup & run the retrieval server
let server = Server::new(blockstore)?;
server
.run(vec![config.listen_address], async move {
token.cancelled_owned().await;
tracing::trace!("shutting down the retrieval server");
})
.await?;

Ok(())
}
2 changes: 1 addition & 1 deletion storage-retrieval/lib/examples/simple_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ async fn main() -> Result<()> {
// Setup & run the server
let server = Server::new(blockstore)?;
let listener: Multiaddr = format!("/ip4/127.0.0.1/tcp/8989").parse()?;
server.run(vec![listener]).await?;
server.run(vec![listener], std::future::pending()).await?;

Ok(())
}
Expand Down
28 changes: 23 additions & 5 deletions storage-retrieval/lib/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use std::{io, sync::Arc};

use blockstore::Blockstore;
use futures::StreamExt;
use futures::{pin_mut, Future, StreamExt};
use libp2p::{Multiaddr, PeerId, Swarm, TransportError};
use libp2p_core::ConnectedPoint;
use libp2p_swarm::{ConnectionId, SwarmEvent};
Expand Down Expand Up @@ -43,17 +43,35 @@ where

// Start the server. The server will stop if it received a cancellation
// event or some error occurred.
pub async fn run(mut self, listeners: Vec<Multiaddr>) -> Result<(), ServerError> {
pub async fn run<F>(
mut self,
listeners: Vec<Multiaddr>,
shutdown_signal: F,
) -> Result<(), ServerError>
where
F: Future<Output = ()> + Send + 'static,
{
// Listen on
for listener in listeners {
self.swarm.listen_on(listener)?;
}

// Keep server running
pin_mut!(shutdown_signal);

loop {
let event = self.swarm.select_next_some().await;
self.on_swarm_event(event)?;
tokio::select! {
event = self.swarm.select_next_some() => {
self.on_swarm_event(event)?;

}
_ = &mut shutdown_signal => {
trace!("received shutdown signal");
break;
}
}
}

Ok(())
}

fn on_swarm_event(&mut self, event: SwarmEvent<BehaviourEvent<B>>) -> Result<(), ServerError> {
Expand Down