diff --git a/cli/polka-storage-provider/server/src/main.rs b/cli/polka-storage-provider/server/src/main.rs index fe05deebe..49884cc4b 100644 --- a/cli/polka-storage-provider/server/src/main.rs +++ b/cli/polka-storage-provider/server/src/main.rs @@ -364,7 +364,7 @@ impl ServerConfiguration { tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?; tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?; - let (tx, rx) = tokio::sync::mpsc::unbounded_channel::(); + let (pipeline_tx, pipeline_rx) = tokio::sync::mpsc::unbounded_channel::(); let storage_state = StorageServerState { car_piece_storage_dir: car_piece_storage_dir.clone(), @@ -384,7 +384,7 @@ impl ServerConfiguration { xt_client: xt_client.clone(), xt_keypair: self.multi_pair_signer.clone(), listen_address: self.rpc_listen_address, - pipeline_sender: tx, + pipeline_sender: pipeline_tx, }; let pipeline_state = PipelineState { @@ -392,11 +392,11 @@ impl ServerConfiguration { unsealed_sectors_dir: unsealed_sector_storage_dir, sealed_sectors_dir: sealed_sector_storage_dir, sealing_cache_dir, - xt_client: xt_client, + xt_client, xt_keypair: self.multi_pair_signer, }; - Ok((storage_state, rpc_state, pipeline_state, rx)) + Ok((storage_state, rpc_state, pipeline_state, pipeline_rx)) } async fn setup_storagext_client( diff --git a/cli/polka-storage-provider/server/src/pipeline.rs b/cli/polka-storage-provider/server/src/pipeline.rs index fafc39cf9..b667ef29c 100644 --- a/cli/polka-storage-provider/server/src/pipeline.rs +++ b/cli/polka-storage-provider/server/src/pipeline.rs @@ -6,11 +6,12 @@ use polka_storage_proofs::porep::{ PoRepError, }; use polka_storage_provider_common::rpc::ServerInfo; -use primitives_proofs::{RegisteredSealProof, SectorNumber}; +use primitives_proofs::{derive_prover_id, RegisteredSealProof, SectorNumber}; use storagext::{ types::{market::ClientDealProposal, storage_provider::SectorPreCommitInfo}, StorageProviderClientExt, SystemClientExt, }; +use subxt::tx::Signer; use tokio::{ sync::mpsc::UnboundedReceiver, task::{JoinError, JoinHandle, JoinSet}, @@ -20,21 +21,22 @@ use tokio_util::sync::CancellationToken; // PLACEHOLDERS!!!!! // TODO(@th7nder,29/10/2024): #474 const SECTOR_ID: u64 = 77; -// TODO(@th7nder,29/10/2024): fix after #485 is merged -const PROVER_ID: [u8; 32] = [0u8; 32]; // TODO(@th7nder,29/10/2024): get from pallet randomness const TICKET: [u8; 32] = [12u8; 32]; // const SEED: [u8; 32] = [13u8; 32]; const SECTOR_EXPIRATION_MARGIN: u64 = 20; +#[derive(Debug)] +pub struct PreCommitMessage { + pub deal: ClientDealProposal, + pub published_deal_id: u64, + pub piece_path: PathBuf, + pub piece_cid: Cid, +} + #[derive(Debug)] pub enum PipelineMessage { - PreCommit { - deal: ClientDealProposal, - published_deal_id: u64, - piece_path: PathBuf, - piece_cid: Cid, - }, + PreCommit(PreCommitMessage), } #[derive(Debug, thiserror::Error)] @@ -73,11 +75,7 @@ pub async fn start_pipeline( tracing::info!("Received msg: {:?}", msg); match msg { Some(msg) => { - match msg { - PipelineMessage::PreCommit { deal, published_deal_id, piece_path, piece_cid } => { - futs.spawn(precommit(state.clone(), deal, published_deal_id, SECTOR_ID, piece_path, piece_cid, token.clone())); - }, - } + process(&mut futs, msg, state.clone(), token.clone()); }, None => { tracing::info!("Channel has been closed..."); @@ -102,6 +100,32 @@ pub async fn start_pipeline( Ok(()) } +fn process( + futs: &mut JoinSet>, + msg: PipelineMessage, + state: Arc, + token: CancellationToken, +) { + match msg { + PipelineMessage::PreCommit(PreCommitMessage { + deal, + published_deal_id, + piece_path, + piece_cid, + }) => { + futs.spawn(precommit( + state, + deal, + published_deal_id, + SECTOR_ID, + piece_path, + piece_cid, + token, + )); + } + } +} + async fn precommit( state: Arc, deal: ClientDealProposal, @@ -121,13 +145,14 @@ async fn precommit( // * where do we save the state of a sector/deals, how do we keep track of it? let sealing_handle: JoinHandle> = { let state = state.clone(); - + let prover_id = derive_prover_id(state.xt_keypair.account_id()); + let cache_dir = state.sealing_cache_dir.clone(); + let unsealed_dir = state.sealed_sectors_dir.clone(); + let sealed_dir = state.sealed_sectors_dir.clone(); tokio::task::spawn_blocking(move || { - let cache_dir = state.sealing_cache_dir.clone(); - let unsealed_dir = state.sealed_sectors_dir.clone(); - let sealed_dir = state.sealed_sectors_dir.clone(); create_replica( sector_id, + prover_id, unsealed_dir, sealed_dir, cache_dir, @@ -153,6 +178,11 @@ async fn precommit( let deal_duration = deal.deal_proposal.end_block - deal_start; let current_block = state.xt_client.height(false).await?; if current_block > deal_start { + tracing::error!( + "Cannot pre-commit sector, sealing was too slow: current_block: {}, deal_start: {}", + current_block, + deal_start + ); return Err(PipelineError::SealingTooSlow(current_block, deal_start)); } @@ -175,6 +205,8 @@ async fn precommit( primitives_commitment::CommitmentKind::Data, ) .cid(), + // TODO(@th7nder,30/10/2024): xxx + seal_randomness_height: 0, }], ) .await?; @@ -194,6 +226,7 @@ async fn precommit( fn create_replica( sector_id: SectorNumber, + prover_id: [u8; 32], unsealed_dir: Arc, sealed_dir: Arc, cache_dir: Arc, @@ -228,7 +261,7 @@ fn create_replica( cache_dir.as_ref(), unsealed_sector_path, sealed_sector_path, - PROVER_ID, + prover_id, SECTOR_ID, TICKET, &piece_infos, diff --git a/cli/polka-storage-provider/server/src/rpc.rs b/cli/polka-storage-provider/server/src/rpc.rs index ac78a709b..17044e4f2 100644 --- a/cli/polka-storage-provider/server/src/rpc.rs +++ b/cli/polka-storage-provider/server/src/rpc.rs @@ -11,7 +11,10 @@ use tokio::sync::mpsc::UnboundedSender; use tokio_util::sync::CancellationToken; use tracing::{info, instrument}; -use crate::{db::DealDB, pipeline::PipelineMessage}; +use crate::{ + db::DealDB, + pipeline::{PipelineMessage, PreCommitMessage}, +}; /// RPC server shared state. pub struct RpcServerState { @@ -111,12 +114,12 @@ impl StorageProviderRpcServer for RpcServerState { let deal_id = published_deals[0].deal_id; self.pipeline_sender - .send(PipelineMessage::PreCommit { + .send(PipelineMessage::PreCommit(PreCommitMessage { deal, published_deal_id: deal_id, piece_path, piece_cid, - }) + })) .map_err(|e| RpcError::internal_error(e, None))?; Ok(deal_id)