Skip to content

Commit

Permalink
connect more dots
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Oct 29, 2024
1 parent 546dc23 commit d00debe
Show file tree
Hide file tree
Showing 3 changed files with 174 additions and 81 deletions.
20 changes: 10 additions & 10 deletions cli/polka-storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,10 @@ const RETRY_NUMBER: u32 = 5;
const CAR_PIECE_DIRECTORY_NAME: &str = "car";

/// Name for the directory where the unsealed pieces are kept.
const UNSEALED_PIECE_DIRECTORY_NAME: &str = "unsealed";
const UNSEALED_SECTOR_DIRECTORY_NAME: &str = "unsealed";

/// Name for the directory where the sealed pieces are kept.
const SEALED_PIECE_DIRECTORY_NAME: &str = "sealed";
const SEALED_SECTOR_DIRECTORY_NAME: &str = "sealed";

/// Name for the directory where the sealing cache is kept.
const SEALING_CACHE_DIRECTORY_NANE: &str = "cache";
Expand Down Expand Up @@ -351,16 +351,16 @@ impl ServerConfiguration {

// Car piece storage directory — i.e. the CAR archives from the input streams
let car_piece_storage_dir = Arc::new(self.storage_directory.join(CAR_PIECE_DIRECTORY_NAME));
let unsealed_piece_storage_dir =
Arc::new(self.storage_directory.join(UNSEALED_PIECE_DIRECTORY_NAME));
let sealed_piece_storage_dir =
Arc::new(self.storage_directory.join(SEALED_PIECE_DIRECTORY_NAME));
let unsealed_sector_storage_dir =
Arc::new(self.storage_directory.join(UNSEALED_SECTOR_DIRECTORY_NAME));
let sealed_sector_storage_dir =
Arc::new(self.storage_directory.join(SEALED_SECTOR_DIRECTORY_NAME));
let sealing_cache_dir = Arc::new(self.storage_directory.join(SEALING_CACHE_DIRECTORY_NANE));

// Create the storage directories
tokio::fs::create_dir_all(car_piece_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(unsealed_piece_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealed_piece_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(unsealed_sector_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealed_sector_storage_dir.as_ref()).await?;

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<PipelineMessage>();

Expand All @@ -387,8 +387,8 @@ impl ServerConfiguration {

let pipeline_state = PipelineState {
server_info: rpc_state.server_info.clone(),
unsealed_piece_storage_dir,
sealed_piece_storage_dir,
unsealed_sectors_dir: unsealed_sector_storage_dir,
sealed_sectors_dir: sealed_sector_storage_dir,
sealing_cache_dir,
xt_client: xt_client,
xt_keypair: self.multi_pair_signer,
Expand Down
221 changes: 154 additions & 67 deletions cli/polka-storage-provider/server/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,58 @@
use std::{path::PathBuf, sync::Arc};

use cid::Cid;
use polka_storage_proofs::porep::sealer::{prepare_piece, PreCommitOutput, Sealer};
use polka_storage_proofs::porep::{
sealer::{prepare_piece, PreCommitOutput, Sealer},
PoRepError,
};
use polka_storage_provider_common::rpc::ServerInfo;
use primitives_proofs::RegisteredSealProof;
use tokio::sync::mpsc::UnboundedReceiver;
use primitives_proofs::{RegisteredSealProof, SectorNumber};
use storagext::{
types::{market::ClientDealProposal, storage_provider::SectorPreCommitInfo},
StorageProviderClientExt, SystemClientExt,
};
use tokio::{
sync::mpsc::UnboundedReceiver,
task::{JoinError, JoinHandle, JoinSet},
};
use tokio_util::sync::CancellationToken;

// PLACEHOLDERS!!!!!
// TODO(@th7nder,29/10/2024): #474
const SECTOR_ID: u64 = 77;
// TODO(@th7nder,29/10/2024): fix after #485 is merged
const PROVER_ID: [u8; 32] = [0u8; 32];
// TODO(@th7nder,29/10/2024): get from pallet randomness
const TICKET: [u8; 32] = [12u8; 32];
// const SEED: [u8; 32] = [13u8; 32];
const SECTOR_EXPIRATION_MARGIN: u64 = 20;

#[derive(Debug)]
pub enum PipelineMessage {
PreCommit,
PreCommit {
deal: ClientDealProposal,
published_deal_id: u64,
piece_path: PathBuf,
piece_cid: Cid,
},
}

#[derive(Debug, thiserror::Error)]
pub enum PipelineError {
#[error(transparent)]
PoRepError(#[from] PoRepError),
#[error(transparent)]
Join(#[from] JoinError),
#[error("Pre-commit took too much time... Cannot start pre-commit now, current_block: {0}, deal_start: {1}")]
SealingTooSlow(u64, u64),
#[error(transparent)]
Subxt(#[from] subxt::Error),
}
/// Pipeline shared state.
pub struct PipelineState {
pub server_info: ServerInfo,
pub unsealed_piece_storage_dir: Arc<PathBuf>,
pub sealed_piece_storage_dir: Arc<PathBuf>,
pub unsealed_sectors_dir: Arc<PathBuf>,
pub sealed_sectors_dir: Arc<PathBuf>,
pub sealing_cache_dir: Arc<PathBuf>,

pub xt_client: Arc<storagext::Client>,
Expand All @@ -36,21 +65,135 @@ pub async fn start_pipeline(
mut receiver: UnboundedReceiver<PipelineMessage>,
token: CancellationToken,
) -> Result<(), std::io::Error> {
let mut futs = JoinSet::new();

loop {
tokio::select! {
msg = receiver.recv() => {
tracing::info!("Received msg: {:?}", msg);
todo!();
match msg {
Some(msg) => {
match msg {
PipelineMessage::PreCommit { deal, published_deal_id, piece_path, piece_cid } => {
futs.spawn(precommit(state.clone(), deal, published_deal_id, SECTOR_ID, piece_path, piece_cid, token.clone()));
},
}
},
None => {
tracing::info!("Channel has been closed...");
break;
},
}
},
_ = token.cancelled() => {
tracing::info!("Pipeline has been stopped...");
return Ok(())
tracing::info!("Pipeline has been stopped by the cancellation token...");
break;
},
}
}

// TODO: should we propgate somehow inner errors to the main thread?
while let Some(res) = futs.join_next().await {
let _ = res.inspect_err(|err| tracing::error!(%err)).inspect(|ok| {
let _ = ok.as_ref().inspect_err(|err| tracing::error!(%err));
});
}

Ok(())
}

async fn precommit(
state: Arc<PipelineState>,
deal: ClientDealProposal,
deal_id: u64,
sector_id: SectorNumber,
piece_path: PathBuf,
piece_cid: Cid,
token: CancellationToken,
) -> Result<(), PipelineError> {
tracing::debug!(
"Starting pre-commit task for deal {}, sector, {}",
deal_id,
sector_id
);
// Questions to be answered:
// * what happens if some of it fails? SP will be slashed, and there is no error reporting?
// * where do we save the state of a sector/deals, how do we keep track of it?
let sealing_handle: JoinHandle<Result<PreCommitOutput, _>> = {
let state = state.clone();

tokio::task::spawn_blocking(move || {
let cache_dir = state.sealing_cache_dir.clone();
let unsealed_dir = state.sealed_sectors_dir.clone();
let sealed_dir = state.sealed_sectors_dir.clone();
create_replica(
sector_id,
unsealed_dir,
sealed_dir,
cache_dir,
piece_path,
state.server_info.seal_proof,
piece_cid,
)
})
};

let sealing_output = tokio::select! {
res = sealing_handle => {
res??
},
_ = token.cancelled() => {
tracing::warn!("Cancelled sealing process...");
return Ok(())
}
};
tracing::info!("Created sector's replica: {:?}", sealing_output);

let deal_start = deal.deal_proposal.start_block;
let deal_duration = deal.deal_proposal.end_block - deal_start;
let current_block = state.xt_client.height(false).await?;
if current_block > deal_start {
return Err(PipelineError::SealingTooSlow(current_block, deal_start));
}

let result = state
.xt_client
.pre_commit_sectors(
&state.xt_keypair,
vec![SectorPreCommitInfo {
deal_ids: vec![deal_id],
expiration: deal_start + deal_duration + SECTOR_EXPIRATION_MARGIN,
sector_number: sector_id,
seal_proof: state.server_info.seal_proof,
sealed_cid: primitives_commitment::Commitment::new(
sealing_output.comm_r,
primitives_commitment::CommitmentKind::Replica,
)
.cid(),
unsealed_cid: primitives_commitment::Commitment::new(
sealing_output.comm_d,
primitives_commitment::CommitmentKind::Data,
)
.cid(),
}],
)
.await?;

let precommited_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsPreCommitted>()
.collect::<Result<Vec<_>, _>>()?;

tracing::info!(
"Successfully pre-commited sectors on-chain: {:?}",
precommited_sectors
);

Ok(())
}

fn create_replica(
sector_id: SectorNumber,
unsealed_dir: Arc<PathBuf>,
sealed_dir: Arc<PathBuf>,
cache_dir: Arc<PathBuf>,
Expand All @@ -64,18 +207,16 @@ fn create_replica(
.try_into()
.expect("piece_cid should have been validated on proposal");

let unsealed_sector_path = unsealed_dir.join(piece_cid.to_string());
let unsealed_sector_path = unsealed_dir.join(sector_id.to_string());
let sealed_sector_path = {
let path = sealed_dir.join(piece_cid.to_string());
let path = sealed_dir.join(sector_id.to_string());
// We need to create the file ourselves, even though that's not documented
std::fs::File::create(&path)?;
path
};

let sealer = Sealer::new(seal_proof);

let prepared_piece = prepare_piece(piece_path, piece_commitment)?;

let piece_infos = {
// The scope creates an implicit drop of the file handler
// avoiding reading issues later on
Expand All @@ -93,57 +234,3 @@ fn create_replica(
&piece_infos,
)
}

/*
let deal_start = deal.deal_proposal.start_block;
let deal_duration = deal.deal_proposal.end_block - deal_start;
let sealing_result = sealing_result.await.map_err(|err| RpcError::internal_error(err, None))??;
tracing::info!("Created sector's replica: {:?}", sealing_result);
// Questions to be answered:
// * what happens if some of it fails? SP will be slashed, and there is no error reporting?
// * where do we save the state of a sector/deals, how do we keep track of it?
let sealing_result: JoinHandle<Result<PreCommitOutput, RpcError>> =
tokio::task::spawn_blocking(move || {
});
let current_block = self.xt_client.height(false).await?;
if current_block > deal_start {
return Err(RpcError::internal_error(format!("Pre-commit took too much time... Cannot start pre-commit now, current_block: {}, deal_start: {}", current_block, deal_start), None));
}
let result = self
.xt_client
.pre_commit_sectors(
&self.xt_keypair,
vec![SectorPreCommitInfo {
deal_ids: bounded_vec![deal_id],
expiration: deal_start + deal_duration + SECTOR_EXPIRATION_MARGIN,
sector_number: SECTOR_ID,
seal_proof,
sealed_cid: primitives_commitment::Commitment::new(
sealing_result.comm_r,
primitives_commitment::CommitmentKind::Replica,
)
.cid(),
unsealed_cid: primitives_commitment::Commitment::new(
sealing_result.comm_d,
primitives_commitment::CommitmentKind::Data,
)
.cid(),
}],
)
.await?;
let precommited_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsPreCommitted>()
.collect::<Result<Vec<_>, _>>()
.map_err(|err| RpcError::internal_error(err, None))?;
*/
14 changes: 10 additions & 4 deletions cli/polka-storage-provider/server/src/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ impl StorageProviderRpcServer for RpcServerState {
// it just requires some API design
let result = self
.xt_client
.publish_signed_storage_deals(&self.xt_keypair, vec![deal])
.publish_signed_storage_deals(&self.xt_keypair, vec![deal.clone()])
.await?;

let published_deals = result
Expand All @@ -107,12 +107,18 @@ impl StorageProviderRpcServer for RpcServerState {
// an error MUST've happened
debug_assert_eq!(published_deals.len(), 1);

// We always publish only 1 deal
let deal_id = published_deals[0].deal_id;

self.pipeline_sender
.send(PipelineMessage::PreCommit)
.send(PipelineMessage::PreCommit {
deal,
published_deal_id: deal_id,
piece_path,
piece_cid,
})
.map_err(|e| RpcError::internal_error(e, None))?;

// We always publish only 1 deal
let deal_id = published_deals[0].deal_id;
Ok(deal_id)
}
}
Expand Down

0 comments on commit d00debe

Please sign in to comment.