Skip to content

Commit

Permalink
adjust from feedback, lots of it
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Oct 29, 2024
1 parent 19c5e2a commit 9e1d2dd
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 26 deletions.
8 changes: 4 additions & 4 deletions cli/polka-storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,7 +364,7 @@ impl ServerConfiguration {
tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?;

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<PipelineMessage>();
let (pipeline_tx, pipeline_rx) = tokio::sync::mpsc::unbounded_channel::<PipelineMessage>();

let storage_state = StorageServerState {
car_piece_storage_dir: car_piece_storage_dir.clone(),
Expand All @@ -384,19 +384,19 @@ impl ServerConfiguration {
xt_client: xt_client.clone(),
xt_keypair: self.multi_pair_signer.clone(),
listen_address: self.rpc_listen_address,
pipeline_sender: tx,
pipeline_sender: pipeline_tx,
};

let pipeline_state = PipelineState {
server_info: rpc_state.server_info.clone(),
unsealed_sectors_dir: unsealed_sector_storage_dir,
sealed_sectors_dir: sealed_sector_storage_dir,
sealing_cache_dir,
xt_client: xt_client,
xt_client,
xt_keypair: self.multi_pair_signer,
};

Ok((storage_state, rpc_state, pipeline_state, rx))
Ok((storage_state, rpc_state, pipeline_state, pipeline_rx))
}

async fn setup_storagext_client(
Expand Down
71 changes: 52 additions & 19 deletions cli/polka-storage-provider/server/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ use polka_storage_proofs::porep::{
PoRepError,
};
use polka_storage_provider_common::rpc::ServerInfo;
use primitives_proofs::{RegisteredSealProof, SectorNumber};
use primitives_proofs::{derive_prover_id, RegisteredSealProof, SectorNumber};
use storagext::{
types::{market::ClientDealProposal, storage_provider::SectorPreCommitInfo},
StorageProviderClientExt, SystemClientExt,
};
use subxt::tx::Signer;
use tokio::{
sync::mpsc::UnboundedReceiver,
task::{JoinError, JoinHandle, JoinSet},
Expand All @@ -20,21 +21,22 @@ use tokio_util::sync::CancellationToken;
// PLACEHOLDERS!!!!!
// TODO(@th7nder,29/10/2024): #474
const SECTOR_ID: u64 = 77;
// TODO(@th7nder,29/10/2024): fix after #485 is merged
const PROVER_ID: [u8; 32] = [0u8; 32];
// TODO(@th7nder,29/10/2024): get from pallet randomness
const TICKET: [u8; 32] = [12u8; 32];
// const SEED: [u8; 32] = [13u8; 32];
const SECTOR_EXPIRATION_MARGIN: u64 = 20;

#[derive(Debug)]
pub struct PreCommitMessage {
pub deal: ClientDealProposal,
pub published_deal_id: u64,
pub piece_path: PathBuf,
pub piece_cid: Cid,
}

#[derive(Debug)]
pub enum PipelineMessage {
PreCommit {
deal: ClientDealProposal,
published_deal_id: u64,
piece_path: PathBuf,
piece_cid: Cid,
},
PreCommit(PreCommitMessage),
}

#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -73,11 +75,7 @@ pub async fn start_pipeline(
tracing::info!("Received msg: {:?}", msg);
match msg {
Some(msg) => {
match msg {
PipelineMessage::PreCommit { deal, published_deal_id, piece_path, piece_cid } => {
futs.spawn(precommit(state.clone(), deal, published_deal_id, SECTOR_ID, piece_path, piece_cid, token.clone()));
},
}
process(&mut futs, msg, state.clone(), token.clone());
},
None => {
tracing::info!("Channel has been closed...");
Expand All @@ -102,6 +100,32 @@ pub async fn start_pipeline(
Ok(())
}

fn process(
futs: &mut JoinSet<Result<(), PipelineError>>,
msg: PipelineMessage,
state: Arc<PipelineState>,
token: CancellationToken,
) {
match msg {
PipelineMessage::PreCommit(PreCommitMessage {
deal,
published_deal_id,
piece_path,
piece_cid,
}) => {
futs.spawn(precommit(
state,
deal,
published_deal_id,
SECTOR_ID,
piece_path,
piece_cid,
token,
));
}
}
}

async fn precommit(
state: Arc<PipelineState>,
deal: ClientDealProposal,
Expand All @@ -121,13 +145,14 @@ async fn precommit(
// * where do we save the state of a sector/deals, how do we keep track of it?
let sealing_handle: JoinHandle<Result<PreCommitOutput, _>> = {
let state = state.clone();

let prover_id = derive_prover_id(state.xt_keypair.account_id());
let cache_dir = state.sealing_cache_dir.clone();
let unsealed_dir = state.sealed_sectors_dir.clone();
let sealed_dir = state.sealed_sectors_dir.clone();
tokio::task::spawn_blocking(move || {
let cache_dir = state.sealing_cache_dir.clone();
let unsealed_dir = state.sealed_sectors_dir.clone();
let sealed_dir = state.sealed_sectors_dir.clone();
create_replica(
sector_id,
prover_id,
unsealed_dir,
sealed_dir,
cache_dir,
Expand All @@ -153,6 +178,11 @@ async fn precommit(
let deal_duration = deal.deal_proposal.end_block - deal_start;
let current_block = state.xt_client.height(false).await?;
if current_block > deal_start {
tracing::error!(
"Cannot pre-commit sector, sealing was too slow: current_block: {}, deal_start: {}",
current_block,
deal_start
);
return Err(PipelineError::SealingTooSlow(current_block, deal_start));
}

Expand All @@ -175,6 +205,8 @@ async fn precommit(
primitives_commitment::CommitmentKind::Data,
)
.cid(),
// TODO(@th7nder,30/10/2024): xxx
seal_randomness_height: 0,
}],
)
.await?;
Expand All @@ -194,6 +226,7 @@ async fn precommit(

fn create_replica(
sector_id: SectorNumber,
prover_id: [u8; 32],
unsealed_dir: Arc<PathBuf>,
sealed_dir: Arc<PathBuf>,
cache_dir: Arc<PathBuf>,
Expand Down Expand Up @@ -228,7 +261,7 @@ fn create_replica(
cache_dir.as_ref(),
unsealed_sector_path,
sealed_sector_path,
PROVER_ID,
prover_id,
SECTOR_ID,
TICKET,
&piece_infos,
Expand Down
9 changes: 6 additions & 3 deletions cli/polka-storage-provider/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use tokio::sync::mpsc::UnboundedSender;
use tokio_util::sync::CancellationToken;
use tracing::{info, instrument};

use crate::{db::DealDB, pipeline::PipelineMessage};
use crate::{
db::DealDB,
pipeline::{PipelineMessage, PreCommitMessage},
};

/// RPC server shared state.
pub struct RpcServerState {
Expand Down Expand Up @@ -111,12 +114,12 @@ impl StorageProviderRpcServer for RpcServerState {
let deal_id = published_deals[0].deal_id;

self.pipeline_sender
.send(PipelineMessage::PreCommit {
.send(PipelineMessage::PreCommit(PreCommitMessage {
deal,
published_deal_id: deal_id,
piece_path,
piece_cid,
})
}))
.map_err(|e| RpcError::internal_error(e, None))?;

Ok(deal_id)
Expand Down

0 comments on commit 9e1d2dd

Please sign in to comment.