From 84a142f7a0373cdf4c82b6afa6f5c381862b35ce Mon Sep 17 00:00:00 2001 From: Sergey Shulepov Date: Thu, 25 Nov 2021 17:30:10 +0000 Subject: [PATCH] pvf-precheck: PVF pre-checker subsystem This commit implements the last major piece of #3211: the subsystem that tracks PVFs that require voting, issues pre-check requests to candidate-validation and makes sure that the votes are submitted to the chain. TODO: - [ ] Run once more with polkadot-launch --- Cargo.lock | 24 + Cargo.toml | 1 + node/core/pvf-checker/Cargo.toml | 28 + node/core/pvf-checker/src/interest_view.rs | 125 +++ node/core/pvf-checker/src/lib.rs | 501 +++++++++++ node/core/pvf-checker/src/runtime_api.rs | 109 +++ node/core/pvf-checker/src/tests.rs | 936 +++++++++++++++++++++ node/network/bridge/src/tests.rs | 1 + node/overseer/src/dummy.rs | 6 +- node/overseer/src/lib.rs | 7 +- node/overseer/src/tests.rs | 11 +- node/service/Cargo.toml | 2 + node/service/src/lib.rs | 3 + node/service/src/overseer.rs | 6 + node/subsystem-types/src/messages.rs | 12 + scripts/gitlab/lingua.dic | 2 +- 16 files changed, 1767 insertions(+), 7 deletions(-) create mode 100644 node/core/pvf-checker/Cargo.toml create mode 100644 node/core/pvf-checker/src/interest_view.rs create mode 100644 node/core/pvf-checker/src/lib.rs create mode 100644 node/core/pvf-checker/src/runtime_api.rs create mode 100644 node/core/pvf-checker/src/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 98f7b779702d..7319e0f06241 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6361,6 +6361,29 @@ dependencies = [ "tracing", ] +[[package]] +name = "polkadot-node-core-pvf-checker" +version = "0.9.13" +dependencies = [ + "futures 0.3.19", + "futures-timer 3.0.2", + "polkadot-node-primitives", + "polkadot-node-subsystem", + "polkadot-node-subsystem-test-helpers", + "polkadot-node-subsystem-util", + "polkadot-overseer", + "polkadot-primitives", + "polkadot-primitives-test-helpers", + "sc-keystore", + "sp-application-crypto", + "sp-core", + "sp-keyring", + "sp-keystore", + "sp-runtime", + "thiserror", + "tracing", +] + [[package]] name = "polkadot-node-core-runtime-api" version = "0.9.13" @@ -6944,6 +6967,7 @@ dependencies = [ "polkadot-node-core-dispute-coordinator", "polkadot-node-core-parachains-inherent", "polkadot-node-core-provisioner", + "polkadot-node-core-pvf-checker", "polkadot-node-core-runtime-api", "polkadot-node-network-protocol", "polkadot-node-primitives", diff --git a/Cargo.toml b/Cargo.toml index beda86e5eff5..1be5452c06bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -61,6 +61,7 @@ members = [ "node/core/parachains-inherent", "node/core/provisioner", "node/core/pvf", + "node/core/pvf-checker", "node/core/runtime-api", "node/network/approval-distribution", "node/network/bridge", diff --git a/node/core/pvf-checker/Cargo.toml b/node/core/pvf-checker/Cargo.toml new file mode 100644 index 000000000000..6e2cf4a94461 --- /dev/null +++ b/node/core/pvf-checker/Cargo.toml @@ -0,0 +1,28 @@ +[package] +name = "polkadot-node-core-pvf-checker" +version = "0.9.13" +authors = ["Parity Technologies "] +edition = "2018" + +[dependencies] +futures = "0.3.17" +thiserror = "1.0.30" +tracing = "0.1.29" + +polkadot-node-primitives = { path = "../../primitives" } +polkadot-node-subsystem = { path = "../../subsystem" } +polkadot-primitives = { path = "../../../primitives" } +polkadot-node-subsystem-util = { path = "../../subsystem-util" } +polkadot-overseer = { path = "../../overseer" } + +sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } + +[dev-dependencies] +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sc-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } +polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers"} +test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" } +sp-application-crypto = { git = "https://github.com/paritytech/substrate", branch = "master" } +futures-timer = "3.0.2" diff --git a/node/core/pvf-checker/src/interest_view.rs b/node/core/pvf-checker/src/interest_view.rs new file mode 100644 index 000000000000..bf867e09cad2 --- /dev/null +++ b/node/core/pvf-checker/src/interest_view.rs @@ -0,0 +1,125 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use polkadot_primitives::v1::{Hash, ValidationCodeHash}; +use std::collections::{ + btree_map::{self, BTreeMap}, + HashSet, +}; + +/// Data about a particular validation code. +#[derive(Default, Debug)] +struct PvfData { + /// If `Some` then the PVF pre-checking was run for this PVF. If `None` we are either waiting + /// for the judgement to come in or the PVF pre-checking failed. + judgement: Option, + + /// The set of block hashes where this PVF was seen. + seen_in: HashSet, +} + +impl PvfData { + /// Initialize a new `PvfData` which is awaiting for the initial judgement. + fn pending(origin: Hash) -> Self { + // Preallocate the hashset with 5 items. This is the anticipated maximum leafs we can + // deal at the same time. In the vast majority of the cases it will have length of 1. + let mut seen_in = HashSet::with_capacity(5); + seen_in.insert(origin); + Self { judgement: None, seen_in } + } + + /// Mark a the `PvfData` as seen in the provided relay-chain block referenced by `relay_hash`. + pub fn seen_in(&mut self, relay_hash: Hash) { + self.seen_in.insert(relay_hash); + } + + /// Removes the given `relay_hash` from the set of seen in, and returns if the set is now empty. + pub fn remove_origin(&mut self, relay_hash: &Hash) -> bool { + self.seen_in.remove(relay_hash); + self.seen_in.is_empty() + } +} + +/// A structure that keeps track of relevant PVFs and judgements about them. A relevant PVF is one +/// that resides in at least a single active leaf. +#[derive(Debug)] +pub struct InterestView { + active_leaves: BTreeMap>, + pvfs: BTreeMap, +} + +impl InterestView { + pub fn new() -> Self { + Self { active_leaves: BTreeMap::new(), pvfs: BTreeMap::new() } + } + + pub fn on_leaves_update( + &mut self, + activated: Option<(Hash, Vec)>, + deactivated: &[Hash], + ) -> Vec { + let mut newcomers = Vec::new(); + + if let Some((leaf, pending_pvfs)) = activated { + for pvf in &pending_pvfs { + match self.pvfs.entry(*pvf) { + btree_map::Entry::Vacant(v) => { + v.insert(PvfData::pending(leaf)); + newcomers.push(*pvf); + }, + btree_map::Entry::Occupied(mut o) => { + o.get_mut().seen_in(leaf); + }, + } + } + self.active_leaves.entry(leaf).or_default().extend(pending_pvfs); + } + + for leaf in deactivated { + let pvfs = self.active_leaves.remove(leaf); + for pvf in pvfs.into_iter().flatten() { + if let btree_map::Entry::Occupied(mut o) = self.pvfs.entry(pvf) { + let now_empty = o.get_mut().remove_origin(leaf); + if now_empty { + o.remove(); + } + } + } + } + + newcomers + } + + /// Handles a new judgement for the given `pvf`. + /// + /// Returns `Err` if the given PVF hash is not known. + pub fn on_judgement(&mut self, subject: ValidationCodeHash, accept: bool) -> Result<(), ()> { + match self.pvfs.get_mut(&subject) { + Some(data) => { + data.judgement = Some(accept); + Ok(()) + }, + None => Err(()), + } + } + + /// Returns all PVFs that previously received a judgement. + pub fn judgements(&self) -> impl Iterator + '_ { + self.pvfs + .iter() + .filter_map(|(code_hash, data)| data.judgement.map(|accept| (*code_hash, accept))) + } +} diff --git a/node/core/pvf-checker/src/lib.rs b/node/core/pvf-checker/src/lib.rs new file mode 100644 index 000000000000..47a28566b3e7 --- /dev/null +++ b/node/core/pvf-checker/src/lib.rs @@ -0,0 +1,501 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! Implements the PVF pre-checking subsystem. +//! +//! This subsystem is responsible for scanning the chain for PVFs that are pending for the approval +//! as well as submitting statements regarding them passing or not the PVF pre-checking. + +use futures::{channel::oneshot, future::BoxFuture, prelude::*, stream::FuturesUnordered}; + +use polkadot_node_subsystem::{ + messages::{CandidateValidationMessage, PreCheckOutcome, PvfCheckerMessage}, + overseer, ActiveLeavesUpdate, FromOverseer, OverseerSignal, SpawnedSubsystem, SubsystemContext, + SubsystemError, SubsystemResult, SubsystemSender, +}; +use polkadot_primitives::{ + v1::{BlockNumber, Hash, SessionIndex, ValidationCodeHash, ValidatorId, ValidatorIndex}, + v2::PvfCheckStatement, +}; +use sp_keystore::SyncCryptoStorePtr; +use std::collections::HashSet; + +const LOG_TARGET: &str = "parachain::pvf-checker"; + +mod interest_view; +mod runtime_api; + +#[cfg(test)] +mod tests; + +use self::interest_view::InterestView; + +/// PVF pre-checking subsystem. +pub struct PvfCheckerSubsystem { + enabled: bool, + keystore: SyncCryptoStorePtr, +} + +impl PvfCheckerSubsystem { + pub fn new(enabled: bool, keystore: SyncCryptoStorePtr) -> Self { + PvfCheckerSubsystem { enabled, keystore } + } +} + +impl overseer::Subsystem for PvfCheckerSubsystem +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + fn start(self, ctx: Context) -> SpawnedSubsystem { + if self.enabled { + let future = run(ctx, self.keystore) + .map_err(|e| SubsystemError::with_origin("pvf-checker", e)) + .boxed(); + + SpawnedSubsystem { name: "pvf-checker-subsystem", future } + } else { + polkadot_overseer::DummySubsystem.start(ctx) + } + } +} + +/// A struct that holds the credentials required to sign the PVF check statements. These credentials +/// are implicitly to pinned to a session where our node acts as a validator. +struct SigningCredentials { + /// The validator public key. + validator_key: ValidatorId, + /// The validator index in the current session. + validator_index: ValidatorIndex, +} + +struct State { + /// If `Some` then our node is in the active validator set during the current session. + /// + /// Updated when a new session index is detected in one of the heads. + credentials: Option, + + /// The number and the hash of the most recent block that we have seen. + /// + /// This is only updated when the PVF pre-checking API is detected in a new leaf block. + recent_block: Option<(BlockNumber, Hash)>, + + /// The session index of the most recent session that we have seen. + /// + /// This is only updated when the PVF pre-checking API is detected in a new leaf block. + latest_session: Option, + + /// The set of PVF hashes that we cast a vote for within the current session. + voted: HashSet, + + /// The collection of PVFs that are observed throughout the active heads. + view: InterestView, + + /// The container for the futures that are waiting for the outcome of the pre-checking. + /// + /// Here are some fun facts about these futures: + /// + /// - Pre-checking can take quite some time, in the matter of tens of seconds, so the futures here + /// can soak for quite some time. + /// - Pre-checking of one PVF can take drastically more time than pre-checking of another PVF. + /// This leads to results coming out of order. + /// + /// Resolving to `None` means that the request was dropped before replying. + currently_checking: + FuturesUnordered>>, +} + +async fn run(mut ctx: Context, keystore: SyncCryptoStorePtr) -> SubsystemResult<()> +where + Context: SubsystemContext, + Context: overseer::SubsystemContext, +{ + let mut state = State { + credentials: None, + recent_block: None, + latest_session: None, + voted: HashSet::with_capacity(16), + view: InterestView::new(), + currently_checking: FuturesUnordered::new(), + }; + + loop { + let mut sender = ctx.sender().clone(); + futures::select! { + precheck_response = state.currently_checking.select_next_some() => { + if let Some((outcome, validation_code_hash)) = precheck_response { + handle_pvf_check( + &mut state, + &mut sender, + &keystore, + outcome, + validation_code_hash, + ).await; + } else { + // Pre-checking request dropped before replying. That can happen in case the + // overseer is shutting down. Our part of shutdown will be handled by the + // overseer conclude signal. + } + } + from_overseer = ctx.recv().fuse() => { + let outcome = handle_from_overseer( + &mut state, + &mut sender, + &keystore, + from_overseer?, + ) + .await; + if let Some(Conclude) = outcome { + return Ok(()); + } + } + } + } +} + +/// Handle an incoming PVF pre-check result from the candidate-validation subsystem. +async fn handle_pvf_check( + state: &mut State, + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + outcome: PreCheckOutcome, + validation_code_hash: ValidationCodeHash, +) { + let accept = match outcome { + PreCheckOutcome::Valid => true, + PreCheckOutcome::Invalid => false, + PreCheckOutcome::Failed => { + // Abstain. + // + // Returning here will leave the PVF in the view dangling. Since it is there, no new + // pre-checking request will be sent. + return + }, + }; + + match state.view.on_judgement(validation_code_hash, accept) { + Ok(()) => (), + Err(()) => { + tracing::debug!( + target: LOG_TARGET, + ?validation_code_hash, + "received judgement for an unknown (or removed) PVF hash", + ); + return + }, + } + + match (state.credentials.as_ref(), state.recent_block, state.latest_session) { + // Note, the availability of credentials implies the availability of the recent block and + // the session index. + (Some(credentials), Some(recent_block), Some(session_index)) => { + sign_and_submit_pvf_check_statement( + sender, + keystore, + &mut state.voted, + credentials, + recent_block.1, + session_index, + accept, + validation_code_hash, + ) + .await; + }, + _ => (), + } +} + +/// A marker for the outer loop that the subsystem should stop. +struct Conclude; + +async fn handle_from_overseer( + state: &mut State, + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + from_overseer: FromOverseer, +) -> Option { + match from_overseer { + FromOverseer::Signal(OverseerSignal::Conclude) => { + tracing::info!(target: LOG_TARGET, "Received `Conclude` signal, exiting"); + Some(Conclude) + }, + FromOverseer::Signal(OverseerSignal::BlockFinalized(_, _)) => { + // ignore + None + }, + FromOverseer::Signal(OverseerSignal::ActiveLeaves(update)) => { + handle_leaves_update(state, sender, keystore, update).await; + None + }, + FromOverseer::Communication { msg } => match msg { + // uninhabited type, thus statically unreachable. + }, + } +} + +async fn handle_leaves_update( + state: &mut State, + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + update: ActiveLeavesUpdate, +) { + if let Some(activated) = update.activated { + let ActivationEffect { new_session_index, recent_block, pending_pvfs } = + match examine_activation(state, sender, keystore, activated.hash, activated.number) + .await + { + None => { + // None indicates that the pre-checking runtime API is not supported. + return + }, + Some(e) => e, + }; + + // Note that this is not necessarily the newly activated leaf. + let recent_block_hash = recent_block.1; + state.recent_block = Some(recent_block); + + // Update the PVF view and get the previously unseen PVFs and start working on them. + let newcomers = state + .view + .on_leaves_update(Some((activated.hash, pending_pvfs)), &update.deactivated); + for newcomer in newcomers { + initiate_precheck(state, sender, recent_block_hash, newcomer).await; + } + + if let Some((new_session_index, credentials)) = new_session_index { + // New session change: + // - update the session index + // - reset the set of all PVFs we voted. + // - set (or reset) the credentials. + state.latest_session = Some(new_session_index); + state.voted.clear(); + state.credentials = credentials; + + // If our node is a validator in the new session, we need to re-sign and submit all + // previously obtained judgements. + if let Some(ref credentials) = state.credentials { + for (code_hash, accept) in state.view.judgements() { + sign_and_submit_pvf_check_statement( + sender, + keystore, + &mut state.voted, + credentials, + recent_block_hash, + new_session_index, + accept, + code_hash, + ) + .await; + } + } + } + } else { + state.view.on_leaves_update(None, &update.deactivated); + } +} + +struct ActivationEffect { + /// If the activated leaf is in a new session, the index of the new session. If the new session + /// has a validator in the set our node happened to have private key for, the signing + new_session_index: Option<(SessionIndex, Option)>, + /// This is the block hash and number of the newly activated block if it's "better" than the + /// last one we've seen. The block is better if it's number is higher or if there are no blocks + /// observed whatsoever. If the leaf is not better then this holds the existing recent block. + recent_block: (BlockNumber, Hash), + /// The full list of PVFs that are pending pre-checking according to the runtime API. In case + /// the API returned an error this list is empty. + pending_pvfs: Vec, +} + +/// Examines the new leaf and returns the effects of the examination. +/// +/// Returns `None` if the PVF pre-checking runtime API is not supported for the given leaf hash. +async fn examine_activation( + state: &mut State, + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + leaf_hash: Hash, + leaf_number: BlockNumber, +) -> Option { + let pending_pvfs = match runtime_api::pvfs_require_precheck(sender, leaf_hash).await { + Err(runtime_api::RuntimeRequestError::NotSupported) => return None, + Err(_) => { + tracing::debug!( + target: LOG_TARGET, + relay_parent = ?leaf_hash, + "cannot fetch PVFs that require pre-checking from runtime API", + ); + Vec::new() + }, + Ok(v) => v, + }; + + let recent_block = match state.recent_block { + Some((recent_block_num, recent_block_hash)) if leaf_number < recent_block_num => { + // the existing recent block is not worse than the new activation, so leave it. + (recent_block_num, recent_block_hash) + }, + _ => (leaf_number, leaf_hash), + }; + + let new_session_index = match runtime_api::session_index_for_child(sender, leaf_hash).await { + Ok(session_index) => + if state.latest_session.map_or(true, |l| l < session_index) { + let signing_credentials = + check_signing_credentials(sender, keystore, leaf_hash).await; + Some((session_index, signing_credentials)) + } else { + None + }, + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + relay_parent = ?leaf_hash, + "cannot fetch session index from runtime API: {:?}", + e, + ); + None + }, + }; + + Some(ActivationEffect { new_session_index, recent_block, pending_pvfs }) +} + +/// Checks the active validators for the given leaf. If we have a signing key for one of them, +/// returns the [`SigningCredentials`]. +async fn check_signing_credentials( + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + leaf: Hash, +) -> Option { + let validators = match runtime_api::validators(sender, leaf).await { + Ok(v) => v, + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + ?leaf, + "error occured during requesting validators: {:?}", + e + ); + return None + }, + }; + + polkadot_node_subsystem_util::signing_key_and_index(&validators, keystore) + .await + .map(|(validator_key, validator_index)| SigningCredentials { + validator_key, + validator_index, + }) +} + +/// Signs and submits a vote for or against a given validation code. +/// +/// If the validator already voted for the given code, this function does nothing. +async fn sign_and_submit_pvf_check_statement( + sender: &mut impl SubsystemSender, + keystore: &SyncCryptoStorePtr, + voted: &mut HashSet, + credentials: &SigningCredentials, + relay_parent: Hash, + session_index: SessionIndex, + accept: bool, + validation_code_hash: ValidationCodeHash, +) { + if voted.contains(&validation_code_hash) { + tracing::trace!( + target: LOG_TARGET, + relay_parent = ?relay_parent, + ?validation_code_hash, + "already voted for this validation code", + ); + return + } + + voted.insert(validation_code_hash); + + let stmt = PvfCheckStatement { + accept, + session_index, + subject: validation_code_hash, + validator_index: credentials.validator_index, + }; + let signature = match polkadot_node_subsystem_util::sign( + keystore, + &credentials.validator_key, + &stmt.signing_payload(), + ) + .await + { + Ok(Some(signature)) => signature, + Ok(None) => { + tracing::warn!( + target: LOG_TARGET, + ?relay_parent, + validator_index = ?credentials.validator_index, + ?validation_code_hash, + "signature is not available", + ); + return + }, + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + ?relay_parent, + validator_index = ?credentials.validator_index, + ?validation_code_hash, + "error signing the statement: {:?}", + e, + ); + return + }, + }; + + match runtime_api::submit_pvf_check_statement(sender, relay_parent, stmt, signature).await { + Ok(()) => (), + Err(e) => { + tracing::warn!( + target: LOG_TARGET, + ?relay_parent, + ?validation_code_hash, + "error occured during submitting a vote: {:?}", + e, + ); + }, + } +} + +/// Sends a request to the candidate-validation subsystem to validate the given PVF. +/// +/// The relay-parent is used as an anchor from where to fetch the PVF code. The request will be put +/// into the `currently_checking` set. +async fn initiate_precheck( + state: &mut State, + sender: &mut impl SubsystemSender, + relay_parent: Hash, + validation_code_hash: ValidationCodeHash, +) { + let (tx, rx) = oneshot::channel(); + sender + .send_message( + CandidateValidationMessage::PreCheck(relay_parent, validation_code_hash, tx).into(), + ) + .await; + state + .currently_checking + .push(Box::pin(async move { rx.await.ok().map(|accept| (accept, validation_code_hash)) })); +} diff --git a/node/core/pvf-checker/src/runtime_api.rs b/node/core/pvf-checker/src/runtime_api.rs new file mode 100644 index 000000000000..59f94f6c1ac0 --- /dev/null +++ b/node/core/pvf-checker/src/runtime_api.rs @@ -0,0 +1,109 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use crate::LOG_TARGET; +use futures::channel::oneshot; +use polkadot_node_subsystem::{ + errors::RuntimeApiError as RuntimeApiSubsystemError, + messages::{RuntimeApiMessage, RuntimeApiRequest}, + SubsystemSender, +}; +use polkadot_primitives::{ + v1::{Hash, SessionIndex, ValidationCodeHash, ValidatorId, ValidatorSignature}, + v2::PvfCheckStatement, +}; + +pub(crate) async fn session_index_for_child( + sender: &mut impl SubsystemSender, + relay_parent: Hash, +) -> Result { + let (tx, rx) = oneshot::channel(); + runtime_api_request(sender, relay_parent, RuntimeApiRequest::SessionIndexForChild(tx), rx).await +} + +pub(crate) async fn validators( + sender: &mut impl SubsystemSender, + relay_parent: Hash, +) -> Result, RuntimeRequestError> { + let (tx, rx) = oneshot::channel(); + runtime_api_request(sender, relay_parent, RuntimeApiRequest::Validators(tx), rx).await +} + +pub(crate) async fn submit_pvf_check_statement( + sender: &mut impl SubsystemSender, + relay_parent: Hash, + stmt: PvfCheckStatement, + signature: ValidatorSignature, +) -> Result<(), RuntimeRequestError> { + let (tx, rx) = oneshot::channel(); + runtime_api_request( + sender, + relay_parent, + RuntimeApiRequest::SubmitPvfCheckStatement(stmt, signature, tx), + rx, + ) + .await +} + +pub(crate) async fn pvfs_require_precheck( + sender: &mut impl SubsystemSender, + relay_parent: Hash, +) -> Result, RuntimeRequestError> { + let (tx, rx) = oneshot::channel(); + runtime_api_request(sender, relay_parent, RuntimeApiRequest::PvfsRequirePrecheck(tx), rx).await +} + +#[derive(Debug)] +pub(crate) enum RuntimeRequestError { + NotSupported, + ApiError, + CommunicationError, +} + +pub(crate) async fn runtime_api_request( + sender: &mut impl SubsystemSender, + relay_parent: Hash, + request: RuntimeApiRequest, + receiver: oneshot::Receiver>, +) -> Result { + sender + .send_message(RuntimeApiMessage::Request(relay_parent, request).into()) + .await; + + receiver + .await + .map_err(|_| { + tracing::debug!(target: LOG_TARGET, ?relay_parent, "Runtime API request dropped"); + RuntimeRequestError::CommunicationError + }) + .and_then(|res| { + res.map_err(|e| { + use RuntimeApiSubsystemError::*; + match e { + Execution { .. } => { + tracing::debug!( + target: LOG_TARGET, + ?relay_parent, + err = ?e, + "Runtime API request internal error" + ); + RuntimeRequestError::ApiError + }, + NotSupported { .. } => RuntimeRequestError::NotSupported, + } + }) + }) +} diff --git a/node/core/pvf-checker/src/tests.rs b/node/core/pvf-checker/src/tests.rs new file mode 100644 index 000000000000..a64b679cd1e9 --- /dev/null +++ b/node/core/pvf-checker/src/tests.rs @@ -0,0 +1,936 @@ +// Copyright 2021 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use ::test_helpers::{dummy_digest, dummy_hash}; +use futures::{channel::oneshot, future::BoxFuture, prelude::*}; +use polkadot_node_subsystem::{ + jaeger, + messages::{ + AllMessages, CandidateValidationMessage, PreCheckOutcome, PvfCheckerMessage, + RuntimeApiMessage, RuntimeApiRequest, + }, + ActivatedLeaf, ActiveLeavesUpdate, FromOverseer, LeafStatus, OverseerSignal, RuntimeApiError, +}; +use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle}; +use polkadot_primitives::{ + v1::{ + BlockNumber, Hash, Header, SessionIndex, ValidationCode, ValidationCodeHash, ValidatorId, + }, + v2::PvfCheckStatement, +}; +use sp_application_crypto::AppKey; +use sp_core::testing::TaskExecutor; +use sp_keyring::Sr25519Keyring; +use sp_keystore::SyncCryptoStore; +use sp_runtime::traits::AppVerify; +use std::{collections::HashMap, sync::Arc, time::Duration}; + +type VirtualOverseer = TestSubsystemContextHandle; + +fn dummy_validation_code_hash(descriminator: u8) -> ValidationCodeHash { + ValidationCode(vec![descriminator]).hash() +} + +struct StartsNewSession { + session_index: SessionIndex, + validators: Vec, +} + +#[derive(Debug, Clone)] +struct FakeLeaf { + block_hash: Hash, + block_number: BlockNumber, + pvfs: Vec, +} + +impl FakeLeaf { + fn new(parent_hash: Hash, block_number: BlockNumber, pvfs: Vec) -> Self { + let block_header = Header { + parent_hash, + number: block_number, + digest: dummy_digest(), + state_root: dummy_hash(), + extrinsics_root: dummy_hash(), + }; + let block_hash = block_header.hash(); + Self { block_hash, block_number, pvfs } + } + + fn descendant(&self, pvfs: Vec) -> FakeLeaf { + FakeLeaf::new(self.block_hash, self.block_number + 1, pvfs) + } +} + +struct LeafState { + /// The session index at which this leaf was activated. + session_index: SessionIndex, + + /// The list of PVFs that are pending in this leaf. + pvfs: Vec, +} + +/// The state we model about a session. +struct SessionState { + validators: Vec, +} + +struct TestState { + leaves: HashMap, + sessions: HashMap, + last_session_index: SessionIndex, +} + +const OUR_VALIDATOR: Sr25519Keyring = Sr25519Keyring::Alice; + +fn validator_pubkeys(val_ids: &[Sr25519Keyring]) -> Vec { + val_ids.iter().map(|v| v.public().into()).collect() +} + +impl TestState { + fn new() -> Self { + // Initialize the default session 1. No validators are present there. + let last_session_index = 1; + let mut sessions = HashMap::new(); + sessions.insert(last_session_index, SessionState { validators: vec![] }); + + let mut leaves = HashMap::new(); + leaves.insert(dummy_hash(), LeafState { session_index: last_session_index, pvfs: vec![] }); + + Self { leaves, sessions, last_session_index } + } + + /// A convenience function to receive a message from the overseer and returning `None` if nothing + /// was received within a reasonable (for local tests anyway) timeout. + async fn recv_timeout(&mut self, handle: &mut VirtualOverseer) -> Option { + futures::select! { + msg = handle.recv().fuse() => { + Some(msg) + } + _ = futures_timer::Delay::new(Duration::from_millis(500)).fuse() => { + None + } + } + } + + async fn send_conclude(&mut self, handle: &mut VirtualOverseer) { + // To ensure that no messages are left in the queue there is no better way to just wait. + match self.recv_timeout(handle).await { + Some(msg) => { + panic!("we supposed to conclude, but received a message: {:#?}", msg); + }, + None => { + // No messages are received. We are good. + }, + } + + handle.send(FromOverseer::Signal(OverseerSignal::Conclude)).await; + } + + /// Convenience function to invoke [`active_leaves_update`] with the new leaf that starts a new + /// session and there are no deactivated leaves. + /// + /// Returns the block hash of the newly activated leaf. + async fn activate_leaf_with_session( + &mut self, + handle: &mut VirtualOverseer, + leaf: FakeLeaf, + starts_new_session: StartsNewSession, + ) { + self.active_leaves_update(handle, Some(leaf), Some(starts_new_session), &[]) + .await + } + + /// Convenience function to invoke [`active_leaves_update`] with a new leaf. The leaf does not + /// start a new session and there are no deactivated leaves. + async fn activate_leaf(&mut self, handle: &mut VirtualOverseer, leaf: FakeLeaf) { + self.active_leaves_update(handle, Some(leaf), None, &[]).await + } + + async fn deactive_leaves( + &mut self, + handle: &mut VirtualOverseer, + deactivated: impl IntoIterator, + ) { + self.active_leaves_update(handle, None, None, deactivated).await + } + + /// Sends an `ActiveLeavesUpdate` message to the overseer and also updates the test state to + /// record leaves and session changes. + /// + /// NOTE: This function may stall if there is an unhandled message for the overseer. + async fn active_leaves_update( + &mut self, + handle: &mut VirtualOverseer, + fake_leaf: Option, + starts_new_session: Option, + deactivated: impl IntoIterator, + ) { + if let Some(new_session) = starts_new_session { + assert!(fake_leaf.is_some(), "Session can be started only with an activated leaf"); + self.last_session_index = new_session.session_index; + let prev = self.sessions.insert( + new_session.session_index, + SessionState { validators: validator_pubkeys(&new_session.validators) }, + ); + assert!(prev.is_none(), "Session {} already exists", new_session.session_index); + } + + let activated = if let Some(activated_leaf) = fake_leaf { + self.leaves.insert( + activated_leaf.block_hash.clone(), + LeafState { + session_index: self.last_session_index, + pvfs: activated_leaf.pvfs.clone(), + }, + ); + + Some(ActivatedLeaf { + hash: activated_leaf.block_hash, + span: Arc::new(jaeger::Span::Disabled), + number: activated_leaf.block_number, + status: LeafStatus::Fresh, + }) + } else { + None + }; + + handle + .send(FromOverseer::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { + activated, + deactivated: deactivated.into_iter().cloned().collect(), + }))) + .await; + } + + /// Expects that the subsystem has sent a `Validators` Runtime API request. Answers with the + /// mocked validators for the requested leaf. + async fn expect_validators(&mut self, handle: &mut VirtualOverseer) { + match self.recv_timeout(handle).await.expect("timeout waiting for a message") { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::Validators(tx), + )) => match self.leaves.get(&relay_parent) { + Some(leaf) => { + let session_index = leaf.session_index; + let session = self.sessions.get(&session_index).unwrap(); + tx.send(Ok(session.validators.clone())).unwrap(); + }, + None => { + panic!("a request to an unknown relay parent has been made"); + }, + }, + msg => panic!("Unexpected message was received: {:#?}", msg), + } + } + + /// Expects that the subsystem has sent a `SessionIndexForChild` Runtime API request. Answers + /// with the mocked session index for the requested leaf. + async fn expect_session_for_child(&mut self, handle: &mut VirtualOverseer) { + match self.recv_timeout(handle).await.expect("timeout waiting for a message") { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SessionIndexForChild(tx), + )) => match self.leaves.get(&relay_parent) { + Some(leaf) => { + tx.send(Ok(leaf.session_index)).unwrap(); + }, + None => { + panic!("a request to an unknown relay parent has been made"); + }, + }, + msg => panic!("Unexpected message was received: {:#?}", msg), + } + } + + /// Expects that the subsystem has sent a `PvfsRequirePrecheck` Runtime API request. Answers + /// with the mocked PVF set for the requested leaf. + async fn expect_pvfs_require_precheck( + &mut self, + handle: &mut VirtualOverseer, + ) -> ExpectPvfsRequirePrecheck<'_> { + match self.recv_timeout(handle).await.expect("timeout waiting for a message") { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::PvfsRequirePrecheck(tx), + )) => ExpectPvfsRequirePrecheck { test_state: self, relay_parent, tx }, + msg => panic!("Unexpected message was received: {:#?}", msg), + } + } + + /// Expects that the subsystem has sent a pre-checking request to candidate-validation. Returns + /// a mocked handle for the request. + async fn expect_candidate_precheck( + &mut self, + handle: &mut VirtualOverseer, + ) -> ExpectCandidatePrecheck { + match self.recv_timeout(handle).await.expect("timeout waiting for a message") { + AllMessages::CandidateValidation(CandidateValidationMessage::PreCheck( + relay_parent, + validation_code_hash, + tx, + )) => ExpectCandidatePrecheck { relay_parent, validation_code_hash, tx }, + msg => panic!("Unexpected message was received: {:#?}", msg), + } + } + + /// Expects that the subsystem has sent a `SubmitPvfCheckStatement` runtime API request. Returns + /// a mocked handle for the request. + async fn expect_submit_vote(&mut self, handle: &mut VirtualOverseer) -> ExpectSubmitVote { + match self.recv_timeout(handle).await.expect("timeout waiting for a message") { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + relay_parent, + RuntimeApiRequest::SubmitPvfCheckStatement(stmt, signature, tx), + )) => { + let signing_payload = stmt.signing_payload(); + assert!(signature.verify(&signing_payload[..], &OUR_VALIDATOR.public().into())); + + ExpectSubmitVote { relay_parent, stmt, tx } + }, + msg => panic!("Unexpected message was received: {:#?}", msg), + } + } +} + +#[must_use] +struct ExpectPvfsRequirePrecheck<'a> { + test_state: &'a mut TestState, + relay_parent: Hash, + tx: oneshot::Sender, RuntimeApiError>>, +} + +impl<'a> ExpectPvfsRequirePrecheck<'a> { + fn reply_mock(self) { + match self.test_state.leaves.get(&self.relay_parent) { + Some(leaf) => { + self.tx.send(Ok(leaf.pvfs.clone())).unwrap(); + }, + None => { + panic!( + "a request to an unknown relay parent has been made: {:#?}", + self.relay_parent + ); + }, + } + } + + fn reply_not_supported(self) { + self.tx + .send(Err(RuntimeApiError::NotSupported { runtime_api_name: "pvfs_require_precheck" })) + .unwrap(); + } +} + +#[must_use] +struct ExpectCandidatePrecheck { + relay_parent: Hash, + validation_code_hash: ValidationCodeHash, + tx: oneshot::Sender, +} + +impl ExpectCandidatePrecheck { + fn reply(self, outcome: PreCheckOutcome) { + self.tx.send(outcome).unwrap(); + } +} + +#[must_use] +struct ExpectSubmitVote { + relay_parent: Hash, + stmt: PvfCheckStatement, + tx: oneshot::Sender>, +} + +impl ExpectSubmitVote { + fn reply_ok(self) { + self.tx.send(Ok(())).unwrap(); + } +} + +fn test_harness(test: impl FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, ()>) { + let pool = TaskExecutor::new(); + let (ctx, handle) = make_subsystem_context::(pool.clone()); + let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory()); + + // Add OUR_VALIDATOR (which is Alice) to the keystore. + SyncCryptoStore::sr25519_generate_new( + &*keystore, + ValidatorId::ID, + Some(&OUR_VALIDATOR.to_seed()), + ) + .expect("Generating keys for our node failed"); + + let subsystem_task = crate::run(ctx, keystore).map(|x| x.unwrap()); + + let test_state = TestState::new(); + let test_task = test(test_state, handle); + + futures::executor::block_on(future::join(subsystem_task, test_task)); +} + +#[test] +fn concludes_correctly() { + test_harness(|mut test_state, mut handle| { + async move { + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn reacts_to_new_pvfs_in_heads() { + test_harness(|mut test_state, mut handle| { + async move { + let block = FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]); + + test_state + .activate_leaf_with_session( + &mut handle, + block.clone(), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + let pre_check = test_state.expect_candidate_precheck(&mut handle).await; + assert_eq!(pre_check.relay_parent, block.block_hash); + pre_check.reply(PreCheckOutcome::Valid); + + let vote = test_state.expect_submit_vote(&mut handle).await; + assert_eq!(vote.relay_parent, block.block_hash); + assert_eq!(vote.stmt.accept, true); + assert_eq!(vote.stmt.session_index, 2); + assert_eq!(vote.stmt.validator_index, 0.into()); + assert_eq!(vote.stmt.subject, dummy_validation_code_hash(1)); + vote.reply_ok(); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn no_new_session_no_validators_request() { + test_harness(|mut test_state, mut handle| { + async move { + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![]), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + test_state + .activate_leaf(&mut handle, FakeLeaf::new(dummy_hash(), 2, vec![])) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn activation_of_descedant_leaves_pvfs_in_view() { + test_harness(|mut test_state, mut handle| { + async move { + let block_1 = FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]); + let block_2 = block_1.descendant(vec![dummy_validation_code_hash(1)]); + + test_state + .activate_leaf_with_session( + &mut handle, + block_1.clone(), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + test_state + .expect_candidate_precheck(&mut handle) + .await + .reply(PreCheckOutcome::Valid); + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + + // Now we deactivate the first block and activate it's descendant. + test_state + .active_leaves_update( + &mut handle, + Some(block_2), + None, // no new session started + &[block_1.block_hash], + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn reactivating_pvf_leads_to_second_check() { + test_harness(|mut test_state, mut handle| { + async move { + let pvf = dummy_validation_code_hash(1); + let block_1 = FakeLeaf::new(dummy_hash(), 1, vec![pvf.clone()]); + let block_2 = block_1.descendant(vec![]); + let block_3 = block_2.descendant(vec![pvf.clone()]); + + test_state + .activate_leaf_with_session( + &mut handle, + block_1.clone(), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + test_state + .expect_candidate_precheck(&mut handle) + .await + .reply(PreCheckOutcome::Valid); + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + + // Now activate a descdedant leaf, where the PVF is not present. + test_state + .active_leaves_update( + &mut handle, + Some(block_2.clone()), + None, + &[block_1.block_hash], + ) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + + // Now the third block is activated, where the PVF is present. + test_state.activate_leaf(&mut handle, block_3).await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state + .expect_candidate_precheck(&mut handle) + .await + .reply(PreCheckOutcome::Valid); + + // We do not vote here, because the PVF was already voted on within this session. + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn dont_double_vote_for_pvfs_in_view() { + test_harness(|mut test_state, mut handle| { + async move { + let pvf = dummy_validation_code_hash(1); + let block_1_1 = FakeLeaf::new([1; 32].into(), 1, vec![pvf.clone()]); + let block_2_1 = FakeLeaf::new([2; 32].into(), 1, vec![pvf.clone()]); + let block_1_2 = block_1_1.descendant(vec![pvf.clone()]); + + test_state + .activate_leaf_with_session( + &mut handle, + block_1_1.clone(), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + // Pre-checking will take quite some time. + let pre_check = test_state.expect_candidate_precheck(&mut handle).await; + + // Activate a sibiling leaf, has the same PVF. + test_state.activate_leaf(&mut handle, block_2_1).await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + + // Now activate a descendant leaf with the same PVF. + test_state + .active_leaves_update( + &mut handle, + Some(block_1_2.clone()), + None, + &[block_1_1.block_hash], + ) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + + // Now finish the pre-checking request. + pre_check.reply(PreCheckOutcome::Valid); + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn judgements_come_out_of_order() { + test_harness(|mut test_state, mut handle| { + async move { + let pvf_1 = dummy_validation_code_hash(1); + let pvf_2 = dummy_validation_code_hash(2); + + let block_1 = FakeLeaf::new([1; 32].into(), 1, vec![pvf_1.clone()]); + let block_2 = FakeLeaf::new([2; 32].into(), 1, vec![pvf_2.clone()]); + + test_state + .activate_leaf_with_session( + &mut handle, + block_1.clone(), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + let pre_check_1 = test_state.expect_candidate_precheck(&mut handle).await; + + // Activate a sibiling leaf, has the second PVF. + test_state.activate_leaf(&mut handle, block_2).await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + + let pre_check_2 = test_state.expect_candidate_precheck(&mut handle).await; + + // Resolve the PVF pre-checks out of order. + pre_check_2.reply(PreCheckOutcome::Valid); + pre_check_1.reply(PreCheckOutcome::Invalid); + + // Catch the vote for the second PVF. + let vote_2 = test_state.expect_submit_vote(&mut handle).await; + assert_eq!(vote_2.stmt.accept, true); + assert_eq!(vote_2.stmt.subject, pvf_2.clone()); + vote_2.reply_ok(); + + // Catch the vote for the first PVF. + let vote_1 = test_state.expect_submit_vote(&mut handle).await; + assert_eq!(vote_1.stmt.accept, false); + assert_eq!(vote_1.stmt.subject, pvf_1.clone()); + vote_1.reply_ok(); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn dont_vote_until_a_validator() { + test_harness(|mut test_state, mut handle| { + async move { + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]), + StartsNewSession { session_index: 2, validators: vec![Sr25519Keyring::Bob] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + test_state + .expect_candidate_precheck(&mut handle) + .await + .reply(PreCheckOutcome::Invalid); + + // Now a leaf brings a new session. In this session our validator comes into the active + // set. That means it will cast a vote for each judgement it has. + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 2, vec![dummy_validation_code_hash(1)]), + StartsNewSession { + session_index: 3, + validators: vec![Sr25519Keyring::Bob, OUR_VALIDATOR], + }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + let vote = test_state.expect_submit_vote(&mut handle).await; + assert_eq!(vote.stmt.accept, false); + assert_eq!(vote.stmt.session_index, 3); + assert_eq!(vote.stmt.validator_index, 1.into()); + assert_eq!(vote.stmt.subject, dummy_validation_code_hash(1)); + vote.reply_ok(); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn resign_on_session_change() { + test_harness(|mut test_state, mut handle| { + async move { + let pvf_1 = dummy_validation_code_hash(1); + let pvf_2 = dummy_validation_code_hash(2); + + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![pvf_1, pvf_2]), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + let pre_check_1 = test_state.expect_candidate_precheck(&mut handle).await; + assert_eq!(pre_check_1.validation_code_hash, pvf_1); + pre_check_1.reply(PreCheckOutcome::Valid); + let pre_check_2 = test_state.expect_candidate_precheck(&mut handle).await; + assert_eq!(pre_check_2.validation_code_hash, pvf_2); + pre_check_2.reply(PreCheckOutcome::Invalid); + + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + + // So far so good. Now we change the session. + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 2, vec![pvf_1, pvf_2]), + StartsNewSession { session_index: 3, validators: vec![OUR_VALIDATOR] }, + ) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + // The votes should be re-signed and re-submitted. + let mut statements = Vec::new(); + let vote_1 = test_state.expect_submit_vote(&mut handle).await; + statements.push(vote_1.stmt.clone()); + vote_1.reply_ok(); + let vote_2 = test_state.expect_submit_vote(&mut handle).await; + statements.push(vote_2.stmt.clone()); + vote_2.reply_ok(); + + // Find and check the votes. + // Unfortunately, the order of revoting is not deterministic so we have to resort to + // a bit of trickery. + assert_eq!(statements.iter().find(|s| s.subject == pvf_1).unwrap().accept, true); + assert_eq!(statements.iter().find(|s| s.subject == pvf_2).unwrap().accept, false); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn dont_resign_if_not_us() { + test_harness(|mut test_state, mut handle| { + async move { + let pvf_1 = dummy_validation_code_hash(1); + let pvf_2 = dummy_validation_code_hash(2); + + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![pvf_1, pvf_2]), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + let pre_check_1 = test_state.expect_candidate_precheck(&mut handle).await; + assert_eq!(pre_check_1.validation_code_hash, pvf_1); + pre_check_1.reply(PreCheckOutcome::Valid); + let pre_check_2 = test_state.expect_candidate_precheck(&mut handle).await; + assert_eq!(pre_check_2.validation_code_hash, pvf_2); + pre_check_2.reply(PreCheckOutcome::Invalid); + + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + + // So far so good. Now we change the session. + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 2, vec![pvf_1, pvf_2]), + StartsNewSession { + session_index: 3, + // not us + validators: vec![Sr25519Keyring::Bob], + }, + ) + .await; + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + // We do not expect any votes to be re-signed. + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn api_not_supported() { + test_harness(|mut test_state, mut handle| { + async move { + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_not_supported(); + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn not_supported_api_becomes_supported() { + test_harness(|mut test_state, mut handle| { + async move { + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_not_supported(); + + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]), + StartsNewSession { session_index: 3, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + test_state + .expect_candidate_precheck(&mut handle) + .await + .reply(PreCheckOutcome::Valid); + test_state.expect_submit_vote(&mut handle).await.reply_ok(); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn unexpected_pvf_check_judgement() { + test_harness(|mut test_state, mut handle| { + async move { + let block_1 = FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]); + test_state + .activate_leaf_with_session( + &mut handle, + block_1.clone(), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + // Catch the pre-check request, but don't reply just yet. + let pre_check = test_state.expect_candidate_precheck(&mut handle).await; + + // Now deactive the leaf and reply to the precheck request. + test_state.deactive_leaves(&mut handle, &[block_1.block_hash]).await; + pre_check.reply(PreCheckOutcome::Invalid); + + // the subsystem must remain silent. + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} + +#[test] +fn abstain_for_nondeterministic_pvfcheck_failure() { + test_harness(|mut test_state, mut handle| { + async move { + test_state + .activate_leaf_with_session( + &mut handle, + FakeLeaf::new(dummy_hash(), 1, vec![dummy_validation_code_hash(1)]), + StartsNewSession { session_index: 2, validators: vec![OUR_VALIDATOR] }, + ) + .await; + + test_state.expect_pvfs_require_precheck(&mut handle).await.reply_mock(); + test_state.expect_session_for_child(&mut handle).await; + test_state.expect_validators(&mut handle).await; + + test_state + .expect_candidate_precheck(&mut handle) + .await + .reply(PreCheckOutcome::Failed); + + test_state.send_conclude(&mut handle).await; + } + .boxed() + }); +} diff --git a/node/network/bridge/src/tests.rs b/node/network/bridge/src/tests.rs index f643a62d38d5..30c1f964e520 100644 --- a/node/network/bridge/src/tests.rs +++ b/node/network/bridge/src/tests.rs @@ -1228,6 +1228,7 @@ fn spread_event_to_subsystems_is_up_to_date() { AllMessages::DisputeCoordinator(_) => unreachable!("Not interested in network events"), AllMessages::DisputeDistribution(_) => unreachable!("Not interested in network events"), AllMessages::ChainSelection(_) => unreachable!("Not interested in network events"), + AllMessages::PvfChecker(_) => unreachable!("Not interested in network events"), // Add variants here as needed, `{ cnt += 1; }` for those that need to be // notified, `unreachable!()` for those that should not. } diff --git a/node/overseer/src/dummy.rs b/node/overseer/src/dummy.rs index bc93ffa064f7..216356cf3f70 100644 --- a/node/overseer/src/dummy.rs +++ b/node/overseer/src/dummy.rs @@ -89,6 +89,7 @@ pub fn dummy_overseer_builder<'a, Spawner, SupportsParachains>( DummySubsystem, DummySubsystem, DummySubsystem, + DummySubsystem, >, SubsystemError, > @@ -129,6 +130,7 @@ pub fn one_for_all_overseer_builder<'a, Spawner, SupportsParachains, Sub>( Sub, Sub, Sub, + Sub, >, SubsystemError, > @@ -155,7 +157,8 @@ where + Subsystem, SubsystemError> + Subsystem, SubsystemError> + Subsystem, SubsystemError> - + Subsystem, SubsystemError>, + + Subsystem, SubsystemError> + + Subsystem, SubsystemError>, { let metrics = ::register(registry)?; @@ -167,6 +170,7 @@ where .bitfield_signing(subsystem.clone()) .candidate_backing(subsystem.clone()) .candidate_validation(subsystem.clone()) + .pvf_checker(subsystem.clone()) .chain_api(subsystem.clone()) .collation_generation(subsystem.clone()) .collator_protocol(subsystem.clone()) diff --git a/node/overseer/src/lib.rs b/node/overseer/src/lib.rs index 0a947506cd90..f12e21245a77 100644 --- a/node/overseer/src/lib.rs +++ b/node/overseer/src/lib.rs @@ -84,8 +84,8 @@ use polkadot_node_subsystem_types::messages::{ BitfieldSigningMessage, CandidateBackingMessage, CandidateValidationMessage, ChainApiMessage, ChainSelectionMessage, CollationGenerationMessage, CollatorProtocolMessage, DisputeCoordinatorMessage, DisputeDistributionMessage, GossipSupportMessage, - NetworkBridgeEvent, NetworkBridgeMessage, ProvisionerMessage, RuntimeApiMessage, - StatementDistributionMessage, + NetworkBridgeEvent, NetworkBridgeMessage, ProvisionerMessage, PvfCheckerMessage, + RuntimeApiMessage, StatementDistributionMessage, }; pub use polkadot_node_subsystem_types::{ errors::{SubsystemError, SubsystemResult}, @@ -420,6 +420,9 @@ pub struct Overseer { #[subsystem(no_dispatch, CandidateValidationMessage)] candidate_validation: CandidateValidation, + #[subsystem(no_dispatch, PvfCheckerMessage)] + pvf_checker: PvfChecker, + #[subsystem(no_dispatch, CandidateBackingMessage)] candidate_backing: CandidateBacking, diff --git a/node/overseer/src/tests.rs b/node/overseer/src/tests.rs index c1ab8bafc27d..77145f0dc06f 100644 --- a/node/overseer/src/tests.rs +++ b/node/overseer/src/tests.rs @@ -920,9 +920,9 @@ fn test_chain_selection_msg() -> ChainSelectionMessage { // Checks that `stop`, `broadcast_signal` and `broadcast_message` are implemented correctly. #[test] fn overseer_all_subsystems_receive_signals_and_messages() { - const NUM_SUBSYSTEMS: usize = 20; - // -3 for BitfieldSigning, GossipSupport and AvailabilityDistribution - const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 3; + const NUM_SUBSYSTEMS: usize = 21; + // -4 for BitfieldSigning, GossipSupport, AvailabilityDistribution and PvfCheckerSubsystem. + const NUM_SUBSYSTEMS_MESSAGED: usize = NUM_SUBSYSTEMS - 4; let spawner = sp_core::testing::TaskExecutor::new(); executor::block_on(async move { @@ -1005,6 +1005,7 @@ fn overseer_all_subsystems_receive_signals_and_messages() { handle .send_msg_anon(AllMessages::ChainSelection(test_chain_selection_msg())) .await; + // handle.send_msg_anon(AllMessages::PvfChecker(test_pvf_checker_msg())).await; // Wait until all subsystems have received. Otherwise the messages might race against // the conclude signal. @@ -1058,6 +1059,7 @@ fn context_holds_onto_message_until_enough_signals_received() { let (dispute_coordinator_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (dispute_distribution_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (chain_selection_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); + let (pvf_checker_bounded_tx, _) = metered::channel(CHANNEL_CAPACITY); let (candidate_validation_unbounded_tx, _) = metered::unbounded(); let (candidate_backing_unbounded_tx, _) = metered::unbounded(); @@ -1079,6 +1081,7 @@ fn context_holds_onto_message_until_enough_signals_received() { let (dispute_coordinator_unbounded_tx, _) = metered::unbounded(); let (dispute_distribution_unbounded_tx, _) = metered::unbounded(); let (chain_selection_unbounded_tx, _) = metered::unbounded(); + let (pvf_checker_unbounded_tx, _) = metered::unbounded(); let channels_out = ChannelsOut { candidate_validation: candidate_validation_bounded_tx.clone(), @@ -1101,6 +1104,7 @@ fn context_holds_onto_message_until_enough_signals_received() { dispute_coordinator: dispute_coordinator_bounded_tx.clone(), dispute_distribution: dispute_distribution_bounded_tx.clone(), chain_selection: chain_selection_bounded_tx.clone(), + pvf_checker: pvf_checker_bounded_tx.clone(), candidate_validation_unbounded: candidate_validation_unbounded_tx.clone(), candidate_backing_unbounded: candidate_backing_unbounded_tx.clone(), @@ -1122,6 +1126,7 @@ fn context_holds_onto_message_until_enough_signals_received() { dispute_coordinator_unbounded: dispute_coordinator_unbounded_tx.clone(), dispute_distribution_unbounded: dispute_distribution_unbounded_tx.clone(), chain_selection_unbounded: chain_selection_unbounded_tx.clone(), + pvf_checker_unbounded: pvf_checker_unbounded_tx.clone(), }; let (mut signal_tx, signal_rx) = metered::channel(CHANNEL_CAPACITY); diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index caee584e760a..eb051644b174 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -115,6 +115,7 @@ polkadot-node-core-chain-api = { path = "../core/chain-api", optional = true } polkadot-node-core-chain-selection = { path = "../core/chain-selection", optional = true } polkadot-node-core-dispute-coordinator = { path = "../core/dispute-coordinator", optional = true } polkadot-node-core-provisioner = { path = "../core/provisioner", optional = true } +polkadot-node-core-pvf-checker = { path = "../core/pvf-checker", optional = true } polkadot-node-core-runtime-api = { path = "../core/runtime-api", optional = true } polkadot-statement-distribution = { path = "../network/statement-distribution", optional = true } @@ -154,6 +155,7 @@ full-node = [ "polkadot-node-core-runtime-api", "polkadot-statement-distribution", "polkadot-approval-distribution", + "polkadot-node-core-pvf-checker", "kvdb-rocksdb" ] diff --git a/node/service/src/lib.rs b/node/service/src/lib.rs index 47a130451a00..909ade280149 100644 --- a/node/service/src/lib.rs +++ b/node/service/src/lib.rs @@ -727,6 +727,8 @@ where chain_spec.is_versi() || chain_spec.is_wococo(); + let pvf_checker_enabled = !is_collator.is_collator() && chain_spec.is_versi(); + let select_chain = if requires_overseer_for_chain_sel { let metrics = polkadot_node_subsystem_util::metrics::Metrics::register(prometheus_registry.as_ref())?; @@ -960,6 +962,7 @@ where chain_selection_config, dispute_coordinator_config, disputes_enabled, + pvf_checker_enabled, }, ) .map_err(|e| { diff --git a/node/service/src/overseer.rs b/node/service/src/overseer.rs index 520db8127d10..bbdc7692fc52 100644 --- a/node/service/src/overseer.rs +++ b/node/service/src/overseer.rs @@ -61,6 +61,7 @@ pub use polkadot_node_core_chain_api::ChainApiSubsystem; pub use polkadot_node_core_chain_selection::ChainSelectionSubsystem; pub use polkadot_node_core_dispute_coordinator::DisputeCoordinatorSubsystem; pub use polkadot_node_core_provisioner::ProvisionerSubsystem; +pub use polkadot_node_core_pvf_checker::PvfCheckerSubsystem; pub use polkadot_node_core_runtime_api::RuntimeApiSubsystem; pub use polkadot_statement_distribution::StatementDistributionSubsystem; @@ -109,6 +110,8 @@ where pub dispute_coordinator_config: DisputeCoordinatorConfig, /// Enable to disputes. pub disputes_enabled: bool, + /// Enable PVF pre-checking + pub pvf_checker_enabled: bool, } /// Obtain a prepared `OverseerBuilder`, that is initialized @@ -136,12 +139,14 @@ pub fn prepared_overseer_builder<'a, Spawner, RuntimeClient>( chain_selection_config, dispute_coordinator_config, disputes_enabled, + pvf_checker_enabled, }: OverseerGenArgs<'a, Spawner, RuntimeClient>, ) -> Result< OverseerBuilder< Spawner, Arc, CandidateValidationSubsystem, + PvfCheckerSubsystem, CandidateBackingSubsystem, StatementDistributionSubsystem, AvailabilityDistributionSubsystem, @@ -208,6 +213,7 @@ where Metrics::register(registry)?, // candidate-validation metrics Metrics::register(registry)?, // validation host metrics )) + .pvf_checker(PvfCheckerSubsystem::new(pvf_checker_enabled, keystore.clone())) .chain_api(ChainApiSubsystem::new(runtime_client.clone(), Metrics::register(registry)?)) .collation_generation(CollationGenerationSubsystem::new(Metrics::register(registry)?)) .collator_protocol({ diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 8a1ccd7af00d..e6f311143249 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -910,3 +910,15 @@ pub enum GossipSupportMessage { #[from] NetworkBridgeUpdateV1(NetworkBridgeEvent), } + +/// PVF checker message. +/// +/// Currently non-instantiable. +#[derive(Debug)] +pub enum PvfCheckerMessage {} + +impl BoundToRelayParent for PvfCheckerMessage { + fn relay_parent(&self) -> Hash { + match *self {} + } +} diff --git a/scripts/gitlab/lingua.dic b/scripts/gitlab/lingua.dic index 944779c8daad..bf70b0512e40 100644 --- a/scripts/gitlab/lingua.dic +++ b/scripts/gitlab/lingua.dic @@ -124,7 +124,7 @@ isolate/BG iterable jaeger/MS js -judgement +judgement/S keccak256/M keypair/MS keystore/MS