Skip to content

Commit

Permalink
feat: integrate pre-commit/create replica with extrinsic call on pipe…
Browse files Browse the repository at this point in the history
…line (#483)
  • Loading branch information
th7nder authored Nov 4, 2024
1 parent 879a03a commit af51a9b
Show file tree
Hide file tree
Showing 21 changed files with 746 additions and 182 deletions.
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions cli/polka-storage-provider/client/src/commands/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use mater::CarV2Reader;
use polka_storage_proofs::{
porep::{self, sealer::Sealer},
post::{self, ReplicaInfo},
types::PieceInfo,
ZeroPaddingReader,
};
use polka_storage_provider_common::commp::{calculate_piece_commitment, CommPError};
use primitives_commitment::piece::PaddedPieceSize;
use primitives_commitment::piece::{PaddedPieceSize, PieceInfo};
use primitives_proofs::{derive_prover_id, RegisteredPoStProof, RegisteredSealProof};
use storagext::multipair::{MultiPairArgs, MultiPairSigner};
use subxt::tx::Signer;
Expand Down Expand Up @@ -237,19 +236,18 @@ impl ProofsCommand {
.map_err(|e| UtilsCommandError::InvalidPieceFile(input_path, e))?
.len();

let piece_file_length =
PaddedPieceSize::from_arbitrary_size(piece_file_length).unpadded();
let piece_file = ZeroPaddingReader::new(piece_file, *piece_file_length);
let piece_file_length = PaddedPieceSize::from_arbitrary_size(piece_file_length);
let piece_file = ZeroPaddingReader::new(piece_file, *piece_file_length.unpadded());

let commp = cid::Cid::from_str(commp.as_str())
.map_err(|e| UtilsCommandError::InvalidPieceCommP(commp, e))?;
let piece_info = PieceInfo {
commitment: commp
.hash()
.digest()
.try_into()
.expect("CommPs guaranteed to be 32 bytes"),
size: *piece_file_length,
commitment: primitives_commitment::Commitment::from_cid(
&commp,
primitives_commitment::CommitmentKind::Piece,
)
.map_err(|e| UtilsCommandError::InvalidPieceType(commp.to_string(), e))?,
size: piece_file_length,
};

let (unsealed_sector_path, unsealed_sector) = file_with_extension(
Expand All @@ -267,7 +265,7 @@ impl ProofsCommand {
println!("Creating sector...");
let sealer = Sealer::new(seal_proof);
let piece_infos = sealer
.create_sector(vec![(piece_file, piece_info.clone())], unsealed_sector)
.create_sector(vec![(piece_file, piece_info)], unsealed_sector)
.map_err(|e| UtilsCommandError::GeneratePoRepError(e))?;

println!("Precommitting...");
Expand Down Expand Up @@ -437,6 +435,8 @@ pub enum UtilsCommandError {
InvalidPieceFile(PathBuf, std::io::Error),
#[error("provided invalid CommP {0}, error: {1}")]
InvalidPieceCommP(String, cid::Error),
#[error("invalid piece type")]
InvalidPieceType(String, &'static str),
#[error("file {0} is invalid CARv2 file {1}")]
InvalidCARv2(PathBuf, mater::Error),
#[error("no signer key was provider")]
Expand Down
8 changes: 7 additions & 1 deletion cli/polka-storage-provider/common/src/rpc/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, io};

use jsonrpsee::types::{
error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE},
Expand Down Expand Up @@ -52,3 +52,9 @@ impl From<subxt::Error> for RpcError {
Self::internal_error(err, None)
}
}

impl From<io::Error> for RpcError {
fn from(err: io::Error) -> Self {
Self::internal_error(err, None)
}
}
7 changes: 4 additions & 3 deletions cli/polka-storage-provider/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@ version = "0.1.0"
mater = { workspace = true }
polka-storage-proofs = { workspace = true, features = ["std"] }
polka-storage-provider-common = { workspace = true }
primitives-commitment = { workspace = true, features = ["std"] }
primitives-commitment = { workspace = true, features = ["serde", "std"] }
primitives-proofs = { workspace = true, features = ["clap"] }
storagext = { workspace = true, features = ["clap", "insecure_url"] }

async-trait = { workspace = true }
axum = { workspace = true, features = ["macros", "multipart"] }
cid = { workspace = true, features = ["std"] }
cid = { workspace = true, features = ["serde", "std"] }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
jsonrpsee = { workspace = true, features = ["http-client", "macros", "server", "ws-client"] }
rand = { workspace = true }
rocksdb = { workspace = true }
sc-cli = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
subxt = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util.workspace = true
tokio-util = { workspace = true, features = ["rt"] }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Expand Down
83 changes: 79 additions & 4 deletions cli/polka-storage-provider/server/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use std::path::Path;
use std::{
path::Path,
sync::atomic::{AtomicU64, Ordering},
};

use primitives_proofs::SectorNumber;
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options as DBOptions, DB as RocksDB};
use storagext::types::market::{ConversionError, DealProposal};

use crate::pipeline::types::Sector;

#[derive(Debug, thiserror::Error)]
pub enum DBError {
#[error(transparent)]
Expand All @@ -22,11 +28,13 @@ pub enum DBError {
}

const ACCEPTED_DEAL_PROPOSALS_CF: &str = "accepted_deal_proposals";
const SECTORS_CF: &str = "sectors";

const COLUMN_FAMILIES: [&str; 1] = [ACCEPTED_DEAL_PROPOSALS_CF];
const COLUMN_FAMILIES: [&str; 2] = [ACCEPTED_DEAL_PROPOSALS_CF, SECTORS_CF];

pub struct DealDB {
database: RocksDB,
last_sector_number: AtomicU64,
}

impl DealDB {
Expand All @@ -42,9 +50,13 @@ impl DealDB {
.into_iter()
.map(|cf_name| ColumnFamilyDescriptor::new(cf_name, DBOptions::default()));

Ok(Self {
let db = Self {
database: RocksDB::open_cf_descriptors(&opts, path, cfs)?,
})
last_sector_number: AtomicU64::new(0),
};

db.initialize_biggest_sector_number()?;
Ok(db)
}

fn cf_handle(&self, name: &str) -> &ColumnFamily {
Expand Down Expand Up @@ -106,5 +118,68 @@ impl DealDB {
)?)
}

pub fn get_sector(&self, sector_id: SectorNumber) -> Result<Option<Sector>, DBError> {
let Some(sector_slice) = self
.database
.get_pinned_cf(self.cf_handle(SECTORS_CF), sector_id.to_le_bytes())?
else {
return Ok(None);
};

let sector = serde_json::from_reader(sector_slice.as_ref())
// SAFETY: this should never fail since the API sets a sector
// if this happens, it means that someone wrote it from a side channel
.expect("invalid content was placed in the database from outside this API");

Ok(Some(sector))
}

pub fn save_sector(&self, sector: &Sector) -> Result<(), DBError> {
let cf_handle = self.cf_handle(SECTORS_CF);
let key = sector.sector_number.to_le_bytes();
let json = serde_json::to_vec(&sector)?;

self.database.put_cf(cf_handle, key, json)?;

Ok(())
}

/// Takes all of the existing sectors, finds the maximum sector id.
/// The simplest way possible of generating an id.
/// This function is private for a reason. It should only be called once at the DealDB initialization.
/// And then `last_sector_number` is incremented by `next_sector_number` only
/// If it was called by multiple threads later than initialization, it could cause a race condition and data erasure.
fn initialize_biggest_sector_number(&self) -> Result<(), DBError> {
let mut biggest_sector_number = 0;
for item in self
.database
.iterator_cf(self.cf_handle(SECTORS_CF), rocksdb::IteratorMode::Start)
{
let (key, _) = item?;
let key: [u8; 8] = key
.as_ref()
.try_into()
.expect("sector's key to be u64 le bytes");
let sector_id = SectorNumber::from_le_bytes(key);
biggest_sector_number = std::cmp::max(biggest_sector_number, sector_id);
}

// [`Ordering::Relaxed`] can be used here as this function is executed only on start-up and once.
// We don't mind, it's just a initialization.
self.last_sector_number
.store(biggest_sector_number, Ordering::Relaxed);
Ok(())
}

/// Atomically increments sector_id counter, so it can be used as an identifier by a sector.
/// Prior to all of the calls to this function, `initialize_biggest_sector_id` must be called at the node start-up.
pub fn next_sector_number(&self) -> SectorNumber {
// [`Ordering::Relaxed`] can be used here, as it's an update on a single variable.
// It does not depend on other Atomic variables and it does not matter which thread makes it first.
// We just need it to be different on every thread that calls it concurrently, so the ids are not duplicated.
let previous = self.last_sector_number.fetch_add(1, Ordering::Relaxed);
previous + 1
}

// NOTE(@jmg-duarte,03/10/2024): I think that from here onwards we're very close of reinventing the LID, but so be it
}
Loading

0 comments on commit af51a9b

Please sign in to comment.