Skip to content

Commit

Permalink
feat: add skeleton for pre-committ and message queue
Browse files Browse the repository at this point in the history
  • Loading branch information
th7nder committed Oct 28, 2024
1 parent 3ec396c commit 3ebf899
Show file tree
Hide file tree
Showing 10 changed files with 222 additions and 85 deletions.
Binary file modified cli/artifacts/metadata.scale
Binary file not shown.
8 changes: 7 additions & 1 deletion cli/polka-storage-provider/common/src/rpc/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, io};

use jsonrpsee::types::{
error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE},
Expand Down Expand Up @@ -52,3 +52,9 @@ impl From<subxt::Error> for RpcError {
Self::internal_error(err, None)
}
}

impl From<io::Error> for RpcError {
fn from(err: io::Error) -> Self {
Self::internal_error(err, None)
}
}
63 changes: 50 additions & 13 deletions cli/polka-storage-provider/server/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
#![deny(clippy::unwrap_used)]

mod db;
mod pipeline;
mod rpc;
mod storage;

use std::{env::temp_dir, net::SocketAddr, path::PathBuf, sync::Arc, time::Duration};

use clap::Parser;
use pipeline::PipelineMessage;
use polka_storage_provider_common::rpc::ServerInfo;
use primitives_proofs::{RegisteredPoStProof, RegisteredSealProof};
use rand::Rng;
Expand All @@ -22,14 +24,15 @@ use subxt::{
},
tx::Signer,
};
use tokio::task::JoinError;
use tokio::{sync::mpsc::UnboundedReceiver, task::JoinError};
use tokio_util::sync::CancellationToken;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use url::Url;

use crate::{
db::{DBError, DealDB},
pipeline::{start_pipeline, PipelineState},
rpc::{start_rpc_server, RpcServerState},
storage::{start_upload_server, StorageServerState},
};
Expand Down Expand Up @@ -268,7 +271,7 @@ impl TryFrom<ServerArguments> for ServerConfiguration {

impl ServerConfiguration {
pub async fn run(self) -> Result<(), ServerError> {
let (storage_state, rpc_state) = self.setup().await?;
let (storage_state, rpc_state, pipeline_state, rx) = self.setup().await?;

let cancellation_token = CancellationToken::new();

Expand All @@ -280,6 +283,11 @@ impl ServerConfiguration {
Arc::new(storage_state),
cancellation_token.child_token(),
));
let pipeline_task = tokio::spawn(start_pipeline(
Arc::new(pipeline_state),
rx,
cancellation_token.child_token(),
));

// Wait for SIGTERM on the main thread and once received "unblock"
tokio::signal::ctrl_c()
Expand All @@ -291,7 +299,8 @@ impl ServerConfiguration {
tracing::info!("sent shutdown signal");

// Wait for the tasks to finish
let (upload_result, rpc_task) = tokio::join!(storage_task, rpc_task);
let (upload_result, rpc_task, pipeline_task) =
tokio::join!(storage_task, rpc_task, pipeline_task);

// Log errors
let upload_result = upload_result
Expand All @@ -305,20 +314,39 @@ impl ServerConfiguration {
let _ = ok.as_ref().inspect_err(|err| tracing::error!(%err));
});

let pipeline_task = pipeline_task
.inspect_err(|err| tracing::error!(%err))
.inspect(|ok| {
let _ = ok.as_ref().inspect_err(|err| tracing::error!(%err));
});

// Exit with error
upload_result??;
rpc_task??;
pipeline_task??;

Ok(())
}

async fn setup(self) -> Result<(StorageServerState, RpcServerState), ServerError> {
let xt_client = ServerConfiguration::setup_storagext_client(
self.node_url,
&self.multi_pair_signer,
&self.post_proof,
)
.await?;
async fn setup(
self,
) -> Result<
(
StorageServerState,
RpcServerState,
PipelineState,
UnboundedReceiver<PipelineMessage>,
),
ServerError,
> {
let xt_client = Arc::new(
ServerConfiguration::setup_storagext_client(
self.node_url,
&self.multi_pair_signer,
&self.post_proof,
)
.await?,
);
let deal_database = Arc::new(DealDB::new(self.database_directory)?);

// Car piece storage directory — i.e. the CAR archives from the input streams
Expand All @@ -334,6 +362,8 @@ impl ServerConfiguration {
tokio::fs::create_dir_all(unsealed_piece_storage_dir.as_ref()).await?;
tokio::fs::create_dir_all(sealed_piece_storage_dir.as_ref()).await?;

let (tx, rx) = tokio::sync::mpsc::unbounded_channel::<PipelineMessage>();

let storage_state = StorageServerState {
car_piece_storage_dir: car_piece_storage_dir.clone(),
deal_db: deal_database.clone(),
Expand All @@ -349,15 +379,22 @@ impl ServerConfiguration {
),
deal_db: deal_database.clone(),
car_piece_storage_dir: car_piece_storage_dir.clone(),
xt_client: xt_client.clone(),
xt_keypair: self.multi_pair_signer.clone(),
listen_address: self.rpc_listen_address,
pipeline_sender: tx,
};

let pipeline_state = PipelineState {
server_info: rpc_state.server_info.clone(),
unsealed_piece_storage_dir,
sealed_piece_storage_dir,
sealing_cache_dir,
xt_client,
xt_client: xt_client,
xt_keypair: self.multi_pair_signer,
listen_address: self.rpc_listen_address,
};

Ok((storage_state, rpc_state))
Ok((storage_state, rpc_state, pipeline_state, rx))
}

async fn setup_storagext_client(
Expand Down
149 changes: 149 additions & 0 deletions cli/polka-storage-provider/server/src/pipeline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
use std::{path::PathBuf, sync::Arc};

use cid::Cid;
use polka_storage_proofs::porep::sealer::{prepare_piece, PreCommitOutput, Sealer};
use polka_storage_provider_common::rpc::ServerInfo;
use primitives_proofs::RegisteredSealProof;
use tokio::sync::mpsc::UnboundedReceiver;
use tokio_util::sync::CancellationToken;

// PLACEHOLDERS!!!!!
const SECTOR_ID: u64 = 77;
const PROVER_ID: [u8; 32] = [0u8; 32];
const TICKET: [u8; 32] = [12u8; 32];
// const SEED: [u8; 32] = [13u8; 32];
const SECTOR_EXPIRATION_MARGIN: u64 = 20;

#[derive(Debug)]
pub enum PipelineMessage {
PreCommit,
}

/// Pipeline shared state.
pub struct PipelineState {
pub server_info: ServerInfo,
pub unsealed_piece_storage_dir: Arc<PathBuf>,
pub sealed_piece_storage_dir: Arc<PathBuf>,
pub sealing_cache_dir: Arc<PathBuf>,

pub xt_client: Arc<storagext::Client>,
pub xt_keypair: storagext::multipair::MultiPairSigner,
}

#[tracing::instrument(skip_all)]
pub async fn start_pipeline(
state: Arc<PipelineState>,
mut receiver: UnboundedReceiver<PipelineMessage>,
token: CancellationToken,
) -> Result<(), std::io::Error> {
loop {
tokio::select! {
msg = receiver.recv() => {
tracing::info!("Received msg: {:?}", msg);
todo!();
},
_ = token.cancelled() => {
tracing::info!("Pipeline has been stopped...");
return Ok(())
},
}
}
}

fn create_replica(
unsealed_dir: Arc<PathBuf>,
sealed_dir: Arc<PathBuf>,
cache_dir: Arc<PathBuf>,
piece_path: PathBuf,
seal_proof: RegisteredSealProof,
piece_cid: Cid,
) -> Result<PreCommitOutput, polka_storage_proofs::porep::PoRepError> {
let piece_commitment: [u8; 32] = piece_cid
.hash()
.digest()
.try_into()
.expect("piece_cid should have been validated on proposal");

let unsealed_sector_path = unsealed_dir.join(piece_cid.to_string());
let sealed_sector_path = {
let path = sealed_dir.join(piece_cid.to_string());
// We need to create the file ourselves, even though that's not documented
std::fs::File::create(&path)?;
path
};

let sealer = Sealer::new(seal_proof);

let prepared_piece = prepare_piece(piece_path, piece_commitment)?;

let piece_infos = {
// The scope creates an implicit drop of the file handler
// avoiding reading issues later on
let sector_writer = std::fs::File::create(&unsealed_sector_path)?;
sealer.create_sector(vec![prepared_piece], sector_writer)?
};

sealer.precommit_sector(
cache_dir.as_ref(),
unsealed_sector_path,
sealed_sector_path,
PROVER_ID,
SECTOR_ID,
TICKET,
&piece_infos,
)
}

/*
let deal_start = deal.deal_proposal.start_block;
let deal_duration = deal.deal_proposal.end_block - deal_start;
let sealing_result = sealing_result.await.map_err(|err| RpcError::internal_error(err, None))??;
tracing::info!("Created sector's replica: {:?}", sealing_result);
// Questions to be answered:
// * what happens if some of it fails? SP will be slashed, and there is no error reporting?
// * where do we save the state of a sector/deals, how do we keep track of it?
let sealing_result: JoinHandle<Result<PreCommitOutput, RpcError>> =
tokio::task::spawn_blocking(move || {
});
let current_block = self.xt_client.height(false).await?;
if current_block > deal_start {
return Err(RpcError::internal_error(format!("Pre-commit took too much time... Cannot start pre-commit now, current_block: {}, deal_start: {}", current_block, deal_start), None));
}
let result = self
.xt_client
.pre_commit_sectors(
&self.xt_keypair,
vec![SectorPreCommitInfo {
deal_ids: bounded_vec![deal_id],
expiration: deal_start + deal_duration + SECTOR_EXPIRATION_MARGIN,
sector_number: SECTOR_ID,
seal_proof,
sealed_cid: primitives_commitment::Commitment::new(
sealing_result.comm_r,
primitives_commitment::CommitmentKind::Replica,
)
.cid(),
unsealed_cid: primitives_commitment::Commitment::new(
sealing_result.comm_d,
primitives_commitment::CommitmentKind::Data,
)
.cid(),
}],
)
.await?;
let precommited_sectors = result
.events
.find::<storagext::runtime::storage_provider::events::SectorsPreCommitted>()
.collect::<Result<Vec<_>, _>>()
.map_err(|err| RpcError::internal_error(err, None))?;
*/
Loading

0 comments on commit 3ebf899

Please sign in to comment.