Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: integrate pre-commit/create replica with extrinsic call on pipeline #483

Merged
merged 29 commits into from
Nov 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
8aaea2d
feat: add skeleton for pre-committ and message queue
th7nder Oct 28, 2024
f8eb787
connect more dots
th7nder Oct 29, 2024
eae5a7b
adjust from feedback, lots of it
th7nder Oct 29, 2024
f913ff9
Don't leak memory
th7nder Oct 30, 2024
d3d6fce
save progress
th7nder Oct 30, 2024
479b207
more keyboard hitting
th7nder Oct 30, 2024
6cb0c85
more work
th7nder Oct 31, 2024
9f21151
PR feedback
th7nder Oct 31, 2024
31d43a4
add more docs
th7nder Oct 31, 2024
fd641c3
add cancellation safety
th7nder Oct 31, 2024
f3ec431
make polka-storage-provider-client work again
th7nder Oct 31, 2024
a5916e4
make sealer nice again
th7nder Oct 31, 2024
3a6ae45
format and optimize imports
th7nder Oct 31, 2024
3c7810d
almost working
th7nder Oct 31, 2024
6cdb527
debugging bugging
th7nder Oct 31, 2024
d4c09b0
fix(pallet-storage-provider): disable commd checking cause its buggy
th7nder Oct 31, 2024
583c9e1
chore: taplo
th7nder Oct 31, 2024
394cb27
docs(polka-storage-provider-node): atomic updates in deal db
th7nder Nov 1, 2024
a180093
refactor: comimtment to a variable
th7nder Nov 1, 2024
150ba16
docs(polka-storage-proofs): sealer
th7nder Nov 1, 2024
dd9d98a
refactor(polka-storage-provider-server): get rid of unpaddedbytesamou…
th7nder Nov 1, 2024
6f02d40
docs(polka-storage-provider-server): precommitoutput docs
th7nder Nov 1, 2024
5006225
docs(polka-storage-provider-server): sector state
th7nder Nov 1, 2024
c4a72b7
docs(polka-storage-provider-server): types
th7nder Nov 1, 2024
c4660a0
docs: improvemet
th7nder Nov 1, 2024
5b1d681
docs: improve docs on precommit
th7nder Nov 1, 2024
614aa75
docs
th7nder Nov 1, 2024
cf01089
chore: fix clippy
th7nder Nov 4, 2024
c6ff3f0
docs: add todo for precommit
th7nder Nov 4, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 12 additions & 12 deletions cli/polka-storage-provider/client/src/commands/proofs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,10 @@ use mater::CarV2Reader;
use polka_storage_proofs::{
porep::{self, sealer::Sealer},
post::{self, ReplicaInfo},
types::PieceInfo,
ZeroPaddingReader,
};
use polka_storage_provider_common::commp::{calculate_piece_commitment, CommPError};
use primitives_commitment::piece::PaddedPieceSize;
use primitives_commitment::piece::{PaddedPieceSize, PieceInfo};
use primitives_proofs::{derive_prover_id, RegisteredPoStProof, RegisteredSealProof};
use storagext::multipair::{MultiPairArgs, MultiPairSigner};
use subxt::tx::Signer;
Expand Down Expand Up @@ -237,19 +236,18 @@ impl ProofsCommand {
.map_err(|e| UtilsCommandError::InvalidPieceFile(input_path, e))?
.len();

let piece_file_length =
PaddedPieceSize::from_arbitrary_size(piece_file_length).unpadded();
let piece_file = ZeroPaddingReader::new(piece_file, *piece_file_length);
let piece_file_length = PaddedPieceSize::from_arbitrary_size(piece_file_length);
let piece_file = ZeroPaddingReader::new(piece_file, *piece_file_length.unpadded());

let commp = cid::Cid::from_str(commp.as_str())
.map_err(|e| UtilsCommandError::InvalidPieceCommP(commp, e))?;
let piece_info = PieceInfo {
commitment: commp
.hash()
.digest()
.try_into()
.expect("CommPs guaranteed to be 32 bytes"),
size: *piece_file_length,
commitment: primitives_commitment::Commitment::from_cid(
&commp,
primitives_commitment::CommitmentKind::Piece,
)
.map_err(|e| UtilsCommandError::InvalidPieceType(commp.to_string(), e))?,
size: piece_file_length,
};

let (unsealed_sector_path, unsealed_sector) = file_with_extension(
Expand All @@ -267,7 +265,7 @@ impl ProofsCommand {
println!("Creating sector...");
let sealer = Sealer::new(seal_proof);
let piece_infos = sealer
.create_sector(vec![(piece_file, piece_info.clone())], unsealed_sector)
.create_sector(vec![(piece_file, piece_info)], unsealed_sector)
.map_err(|e| UtilsCommandError::GeneratePoRepError(e))?;

println!("Precommitting...");
Expand Down Expand Up @@ -437,6 +435,8 @@ pub enum UtilsCommandError {
InvalidPieceFile(PathBuf, std::io::Error),
#[error("provided invalid CommP {0}, error: {1}")]
InvalidPieceCommP(String, cid::Error),
#[error("invalid piece type")]
InvalidPieceType(String, &'static str),
#[error("file {0} is invalid CARv2 file {1}")]
InvalidCARv2(PathBuf, mater::Error),
#[error("no signer key was provider")]
Expand Down
8 changes: 7 additions & 1 deletion cli/polka-storage-provider/common/src/rpc/error.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fmt::Display;
use std::{fmt::Display, io};

use jsonrpsee::types::{
error::{INTERNAL_ERROR_CODE, INVALID_PARAMS_CODE},
Expand Down Expand Up @@ -52,3 +52,9 @@ impl From<subxt::Error> for RpcError {
Self::internal_error(err, None)
}
}

impl From<io::Error> for RpcError {
fn from(err: io::Error) -> Self {
Self::internal_error(err, None)
}
}
7 changes: 4 additions & 3 deletions cli/polka-storage-provider/server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,25 +12,26 @@ version = "0.1.0"
mater = { workspace = true }
polka-storage-proofs = { workspace = true, features = ["std"] }
polka-storage-provider-common = { workspace = true }
primitives-commitment = { workspace = true, features = ["std"] }
primitives-commitment = { workspace = true, features = ["serde", "std"] }
primitives-proofs = { workspace = true, features = ["clap"] }
storagext = { workspace = true, features = ["clap", "insecure_url"] }

async-trait = { workspace = true }
axum = { workspace = true, features = ["macros", "multipart"] }
cid = { workspace = true, features = ["std"] }
cid = { workspace = true, features = ["serde", "std"] }
clap = { workspace = true, features = ["derive"] }
futures = { workspace = true }
jsonrpsee = { workspace = true, features = ["http-client", "macros", "server", "ws-client"] }
rand = { workspace = true }
rocksdb = { workspace = true }
sc-cli = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
subxt = { workspace = true }
tempfile = { workspace = true }
thiserror = { workspace = true }
tokio = { workspace = true }
tokio-util.workspace = true
tokio-util = { workspace = true, features = ["rt"] }
tower-http = { workspace = true, features = ["trace"] }
tracing = { workspace = true }
tracing-subscriber = { workspace = true, features = ["env-filter"] }
Expand Down
83 changes: 79 additions & 4 deletions cli/polka-storage-provider/server/src/db.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
use std::path::Path;
use std::{
path::Path,
sync::atomic::{AtomicU64, Ordering},
};

use primitives_proofs::SectorNumber;
use rocksdb::{ColumnFamily, ColumnFamilyDescriptor, Options as DBOptions, DB as RocksDB};
use storagext::types::market::{ConversionError, DealProposal};

use crate::pipeline::types::Sector;

#[derive(Debug, thiserror::Error)]
pub enum DBError {
#[error(transparent)]
Expand All @@ -22,11 +28,13 @@ pub enum DBError {
}

const ACCEPTED_DEAL_PROPOSALS_CF: &str = "accepted_deal_proposals";
const SECTORS_CF: &str = "sectors";

const COLUMN_FAMILIES: [&str; 1] = [ACCEPTED_DEAL_PROPOSALS_CF];
const COLUMN_FAMILIES: [&str; 2] = [ACCEPTED_DEAL_PROPOSALS_CF, SECTORS_CF];

pub struct DealDB {
database: RocksDB,
last_sector_number: AtomicU64,
}

impl DealDB {
Expand All @@ -42,9 +50,13 @@ impl DealDB {
.into_iter()
.map(|cf_name| ColumnFamilyDescriptor::new(cf_name, DBOptions::default()));

Ok(Self {
let db = Self {
database: RocksDB::open_cf_descriptors(&opts, path, cfs)?,
})
last_sector_number: AtomicU64::new(0),
};

db.initialize_biggest_sector_number()?;
Ok(db)
}

fn cf_handle(&self, name: &str) -> &ColumnFamily {
Expand Down Expand Up @@ -106,5 +118,68 @@ impl DealDB {
)?)
}

pub fn get_sector(&self, sector_id: SectorNumber) -> Result<Option<Sector>, DBError> {
let Some(sector_slice) = self
.database
.get_pinned_cf(self.cf_handle(SECTORS_CF), sector_id.to_le_bytes())?
else {
return Ok(None);
};

let sector = serde_json::from_reader(sector_slice.as_ref())
// SAFETY: this should never fail since the API sets a sector
th7nder marked this conversation as resolved.
Show resolved Hide resolved
// if this happens, it means that someone wrote it from a side channel
.expect("invalid content was placed in the database from outside this API");

Ok(Some(sector))
}

pub fn save_sector(&self, sector: &Sector) -> Result<(), DBError> {
let cf_handle = self.cf_handle(SECTORS_CF);
let key = sector.sector_number.to_le_bytes();
let json = serde_json::to_vec(&sector)?;

self.database.put_cf(cf_handle, key, json)?;

Ok(())
}

/// Takes all of the existing sectors, finds the maximum sector id.
/// The simplest way possible of generating an id.
/// This function is private for a reason. It should only be called once at the DealDB initialization.
/// And then `last_sector_number` is incremented by `next_sector_number` only
/// If it was called by multiple threads later than initialization, it could cause a race condition and data erasure.
fn initialize_biggest_sector_number(&self) -> Result<(), DBError> {
let mut biggest_sector_number = 0;
for item in self
.database
.iterator_cf(self.cf_handle(SECTORS_CF), rocksdb::IteratorMode::Start)
{
let (key, _) = item?;
let key: [u8; 8] = key
.as_ref()
.try_into()
.expect("sector's key to be u64 le bytes");
let sector_id = SectorNumber::from_le_bytes(key);
biggest_sector_number = std::cmp::max(biggest_sector_number, sector_id);
}

// [`Ordering::Relaxed`] can be used here as this function is executed only on start-up and once.
// We don't mind, it's just a initialization.
self.last_sector_number
.store(biggest_sector_number, Ordering::Relaxed);
Ok(())
}

/// Atomically increments sector_id counter, so it can be used as an identifier by a sector.
/// Prior to all of the calls to this function, `initialize_biggest_sector_id` must be called at the node start-up.
pub fn next_sector_number(&self) -> SectorNumber {
// [`Ordering::Relaxed`] can be used here, as it's an update on a single variable.
// It does not depend on other Atomic variables and it does not matter which thread makes it first.
// We just need it to be different on every thread that calls it concurrently, so the ids are not duplicated.
let previous = self.last_sector_number.fetch_add(1, Ordering::Relaxed);
previous + 1
}

// NOTE(@jmg-duarte,03/10/2024): I think that from here onwards we're very close of reinventing the LID, but so be it
}
Loading
Loading