diff --git a/cli/polka-storage-provider/server/src/main.rs b/cli/polka-storage-provider/server/src/main.rs index 445ef5ee7..3b76dfe15 100644 --- a/cli/polka-storage-provider/server/src/main.rs +++ b/cli/polka-storage-provider/server/src/main.rs @@ -56,10 +56,10 @@ const RETRY_NUMBER: u32 = 5; const CAR_PIECE_DIRECTORY_NAME: &str = "car"; /// Name for the directory where the unsealed pieces are kept. -const UNSEALED_PIECE_DIRECTORY_NAME: &str = "unsealed"; +const UNSEALED_SECTOR_DIRECTORY_NAME: &str = "unsealed"; /// Name for the directory where the sealed pieces are kept. -const SEALED_PIECE_DIRECTORY_NAME: &str = "sealed"; +const SEALED_SECTOR_DIRECTORY_NAME: &str = "sealed"; /// Name for the directory where the sealing cache is kept. const SEALING_CACHE_DIRECTORY_NANE: &str = "cache"; @@ -351,16 +351,16 @@ impl ServerConfiguration { // Car piece storage directory — i.e. the CAR archives from the input streams let car_piece_storage_dir = Arc::new(self.storage_directory.join(CAR_PIECE_DIRECTORY_NAME)); - let unsealed_piece_storage_dir = - Arc::new(self.storage_directory.join(UNSEALED_PIECE_DIRECTORY_NAME)); - let sealed_piece_storage_dir = - Arc::new(self.storage_directory.join(SEALED_PIECE_DIRECTORY_NAME)); + let unsealed_sector_storage_dir = + Arc::new(self.storage_directory.join(UNSEALED_SECTOR_DIRECTORY_NAME)); + let sealed_sector_storage_dir = + Arc::new(self.storage_directory.join(SEALED_SECTOR_DIRECTORY_NAME)); let sealing_cache_dir = Arc::new(self.storage_directory.join(SEALING_CACHE_DIRECTORY_NANE)); // Create the storage directories tokio::fs::create_dir_all(car_piece_storage_dir.as_ref()).await?; - tokio::fs::create_dir_all(unsealed_piece_storage_dir.as_ref()).await?; - tokio::fs::create_dir_all(sealed_piece_storage_dir.as_ref()).await?; + tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?; + tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?; let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); @@ -387,8 +387,8 @@ impl ServerConfiguration { let pipeline_state = PipelineState { server_info: rpc_state.server_info.clone(), - unsealed_piece_storage_dir, - sealed_piece_storage_dir, + unsealed_sectors_dir: unsealed_sector_storage_dir, + sealed_sectors_dir: sealed_sector_storage_dir, sealing_cache_dir, xt_client: xt_client, xt_keypair: self.multi_pair_signer, diff --git a/cli/polka-storage-provider/server/src/pipeline.rs b/cli/polka-storage-provider/server/src/pipeline.rs index 0e5fdf910..fafc39cf9 100644 --- a/cli/polka-storage-provider/server/src/pipeline.rs +++ b/cli/polka-storage-provider/server/src/pipeline.rs @@ -1,29 +1,58 @@ use std::{path::PathBuf, sync::Arc}; use cid::Cid; -use polka_storage_proofs::porep::sealer::{prepare_piece, PreCommitOutput, Sealer}; +use polka_storage_proofs::porep::{ + sealer::{prepare_piece, PreCommitOutput, Sealer}, + PoRepError, +}; use polka_storage_provider_common::rpc::ServerInfo; -use primitives_proofs::RegisteredSealProof; -use tokio::sync::mpsc::UnboundedReceiver; +use primitives_proofs::{RegisteredSealProof, SectorNumber}; +use storagext::{ + types::{market::ClientDealProposal, storage_provider::SectorPreCommitInfo}, + StorageProviderClientExt, SystemClientExt, +}; +use tokio::{ + sync::mpsc::UnboundedReceiver, + task::{JoinError, JoinHandle, JoinSet}, +}; use tokio_util::sync::CancellationToken; // PLACEHOLDERS!!!!! +// TODO(@th7nder,29/10/2024): #474 const SECTOR_ID: u64 = 77; +// TODO(@th7nder,29/10/2024): fix after #485 is merged const PROVER_ID: [u8; 32] = [0u8; 32]; +// TODO(@th7nder,29/10/2024): get from pallet randomness const TICKET: [u8; 32] = [12u8; 32]; // const SEED: [u8; 32] = [13u8; 32]; const SECTOR_EXPIRATION_MARGIN: u64 = 20; #[derive(Debug)] pub enum PipelineMessage { - PreCommit, + PreCommit { + deal: ClientDealProposal, + published_deal_id: u64, + piece_path: PathBuf, + piece_cid: Cid, + }, } +#[derive(Debug, thiserror::Error)] +pub enum PipelineError { + #[error(transparent)] + PoRepError(#[from] PoRepError), + #[error(transparent)] + Join(#[from] JoinError), + #[error("Pre-commit took too much time... Cannot start pre-commit now, current_block: {0}, deal_start: {1}")] + SealingTooSlow(u64, u64), + #[error(transparent)] + Subxt(#[from] subxt::Error), +} /// Pipeline shared state. pub struct PipelineState { pub server_info: ServerInfo, - pub unsealed_piece_storage_dir: Arc, - pub sealed_piece_storage_dir: Arc, + pub unsealed_sectors_dir: Arc, + pub sealed_sectors_dir: Arc, pub sealing_cache_dir: Arc, pub xt_client: Arc, @@ -36,21 +65,135 @@ pub async fn start_pipeline( mut receiver: UnboundedReceiver, token: CancellationToken, ) -> Result<(), std::io::Error> { + let mut futs = JoinSet::new(); + loop { tokio::select! { msg = receiver.recv() => { tracing::info!("Received msg: {:?}", msg); - todo!(); + match msg { + Some(msg) => { + match msg { + PipelineMessage::PreCommit { deal, published_deal_id, piece_path, piece_cid } => { + futs.spawn(precommit(state.clone(), deal, published_deal_id, SECTOR_ID, piece_path, piece_cid, token.clone())); + }, + } + }, + None => { + tracing::info!("Channel has been closed..."); + break; + }, + } }, _ = token.cancelled() => { - tracing::info!("Pipeline has been stopped..."); - return Ok(()) + tracing::info!("Pipeline has been stopped by the cancellation token..."); + break; }, } } + + // TODO: should we propgate somehow inner errors to the main thread? + while let Some(res) = futs.join_next().await { + let _ = res.inspect_err(|err| tracing::error!(%err)).inspect(|ok| { + let _ = ok.as_ref().inspect_err(|err| tracing::error!(%err)); + }); + } + + Ok(()) +} + +async fn precommit( + state: Arc, + deal: ClientDealProposal, + deal_id: u64, + sector_id: SectorNumber, + piece_path: PathBuf, + piece_cid: Cid, + token: CancellationToken, +) -> Result<(), PipelineError> { + tracing::debug!( + "Starting pre-commit task for deal {}, sector, {}", + deal_id, + sector_id + ); + // Questions to be answered: + // * what happens if some of it fails? SP will be slashed, and there is no error reporting? + // * where do we save the state of a sector/deals, how do we keep track of it? + let sealing_handle: JoinHandle> = { + let state = state.clone(); + + tokio::task::spawn_blocking(move || { + let cache_dir = state.sealing_cache_dir.clone(); + let unsealed_dir = state.sealed_sectors_dir.clone(); + let sealed_dir = state.sealed_sectors_dir.clone(); + create_replica( + sector_id, + unsealed_dir, + sealed_dir, + cache_dir, + piece_path, + state.server_info.seal_proof, + piece_cid, + ) + }) + }; + + let sealing_output = tokio::select! { + res = sealing_handle => { + res?? + }, + _ = token.cancelled() => { + tracing::warn!("Cancelled sealing process..."); + return Ok(()) + } + }; + tracing::info!("Created sector's replica: {:?}", sealing_output); + + let deal_start = deal.deal_proposal.start_block; + let deal_duration = deal.deal_proposal.end_block - deal_start; + let current_block = state.xt_client.height(false).await?; + if current_block > deal_start { + return Err(PipelineError::SealingTooSlow(current_block, deal_start)); + } + + let result = state + .xt_client + .pre_commit_sectors( + &state.xt_keypair, + vec![SectorPreCommitInfo { + deal_ids: vec![deal_id], + expiration: deal_start + deal_duration + SECTOR_EXPIRATION_MARGIN, + sector_number: sector_id, + seal_proof: state.server_info.seal_proof, + sealed_cid: primitives_commitment::Commitment::new( + sealing_output.comm_r, + primitives_commitment::CommitmentKind::Replica, + ) + .cid(), + unsealed_cid: primitives_commitment::Commitment::new( + sealing_output.comm_d, + primitives_commitment::CommitmentKind::Data, + ) + .cid(), + }], + ) + .await?; + + let precommited_sectors = result + .events + .find::() + .collect::, _>>()?; + + tracing::info!( + "Successfully pre-commited sectors on-chain: {:?}", + precommited_sectors + ); + + Ok(()) } fn create_replica( + sector_id: SectorNumber, unsealed_dir: Arc, sealed_dir: Arc, cache_dir: Arc, @@ -64,18 +207,16 @@ fn create_replica( .try_into() .expect("piece_cid should have been validated on proposal"); - let unsealed_sector_path = unsealed_dir.join(piece_cid.to_string()); + let unsealed_sector_path = unsealed_dir.join(sector_id.to_string()); let sealed_sector_path = { - let path = sealed_dir.join(piece_cid.to_string()); + let path = sealed_dir.join(sector_id.to_string()); // We need to create the file ourselves, even though that's not documented std::fs::File::create(&path)?; path }; let sealer = Sealer::new(seal_proof); - let prepared_piece = prepare_piece(piece_path, piece_commitment)?; - let piece_infos = { // The scope creates an implicit drop of the file handler // avoiding reading issues later on @@ -93,57 +234,3 @@ fn create_replica( &piece_infos, ) } - -/* - - let deal_start = deal.deal_proposal.start_block; - let deal_duration = deal.deal_proposal.end_block - deal_start; - - - let sealing_result = sealing_result.await.map_err(|err| RpcError::internal_error(err, None))??; - tracing::info!("Created sector's replica: {:?}", sealing_result); - - // Questions to be answered: - // * what happens if some of it fails? SP will be slashed, and there is no error reporting? - // * where do we save the state of a sector/deals, how do we keep track of it? - let sealing_result: JoinHandle> = - tokio::task::spawn_blocking(move || { - - }); - - let current_block = self.xt_client.height(false).await?; - if current_block > deal_start { - return Err(RpcError::internal_error(format!("Pre-commit took too much time... Cannot start pre-commit now, current_block: {}, deal_start: {}", current_block, deal_start), None)); - } - - - let result = self - .xt_client - .pre_commit_sectors( - &self.xt_keypair, - vec![SectorPreCommitInfo { - deal_ids: bounded_vec![deal_id], - expiration: deal_start + deal_duration + SECTOR_EXPIRATION_MARGIN, - sector_number: SECTOR_ID, - seal_proof, - sealed_cid: primitives_commitment::Commitment::new( - sealing_result.comm_r, - primitives_commitment::CommitmentKind::Replica, - ) - .cid(), - unsealed_cid: primitives_commitment::Commitment::new( - sealing_result.comm_d, - primitives_commitment::CommitmentKind::Data, - ) - .cid(), - }], - ) - .await?; - - let precommited_sectors = result - .events - .find::() - .collect::, _>>() - .map_err(|err| RpcError::internal_error(err, None))?; - -*/ diff --git a/cli/polka-storage-provider/server/src/rpc.rs b/cli/polka-storage-provider/server/src/rpc.rs index 7220ff7fe..ac78a709b 100644 --- a/cli/polka-storage-provider/server/src/rpc.rs +++ b/cli/polka-storage-provider/server/src/rpc.rs @@ -94,7 +94,7 @@ impl StorageProviderRpcServer for RpcServerState { // it just requires some API design let result = self .xt_client - .publish_signed_storage_deals(&self.xt_keypair, vec![deal]) + .publish_signed_storage_deals(&self.xt_keypair, vec![deal.clone()]) .await?; let published_deals = result @@ -107,12 +107,18 @@ impl StorageProviderRpcServer for RpcServerState { // an error MUST've happened debug_assert_eq!(published_deals.len(), 1); + // We always publish only 1 deal + let deal_id = published_deals[0].deal_id; + self.pipeline_sender - .send(PipelineMessage::PreCommit) + .send(PipelineMessage::PreCommit { + deal, + published_deal_id: deal_id, + piece_path, + piece_cid, + }) .map_err(|e| RpcError::internal_error(e, None))?; - // We always publish only 1 deal - let deal_id = published_deals[0].deal_id; Ok(deal_id) } }