Skip to content

Commit

Permalink
Merge pull request #1838 from subspace/pot-gossip-refactoring
Browse files Browse the repository at this point in the history
PoT gossip refactoring
  • Loading branch information
nazar-pc authored Aug 22, 2023
2 parents 8e04bbf + f77533c commit f782f72
Show file tree
Hide file tree
Showing 6 changed files with 196 additions and 248 deletions.
189 changes: 152 additions & 37 deletions crates/sc-proof-of-time/src/gossip.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,104 @@
//! PoT gossip functionality.

use crate::state_manager::PotProtocolState;
use crate::PotComponents;
use futures::channel::mpsc;
use futures::{FutureExt, StreamExt};
use parity_scale_codec::Decode;
use parity_scale_codec::{Decode, Encode};
use parking_lot::{Mutex, RwLock};
use sc_client_api::BlockchainEvents;
use sc_network::config::NonDefaultSetConfig;
use sc_network::PeerId;
use sc_network_gossip::{
GossipEngine, MessageIntent, Syncing as GossipSyncing, ValidationResult, Validator,
ValidatorContext,
};
use sp_blockchain::HeaderBackend;
use sp_consensus_subspace::digests::extract_pre_digest;
use sp_runtime::traits::{Block as BlockT, Hash as HashT, Header as HeaderT};
use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;
use subspace_core_primitives::crypto::blake2b_256_hash;
use subspace_core_primitives::{Blake2b256Hash, PotProof};
use subspace_proof_of_time::ProofOfTime;
use tracing::{error, trace};
use tracing::{debug, error, trace, warn};

pub(crate) const GOSSIP_PROTOCOL: &str = "/subspace/subspace-proof-of-time";

/// PoT gossip components.
#[derive(Clone)]
pub(crate) struct PotGossip<Block: BlockT> {
/// PoT gossip worker
#[must_use = "Gossip worker doesn't do anything unless run() method is called"]
pub struct PotGossipWorker<Block, Client>
where
Block: BlockT,
{
engine: Arc<Mutex<GossipEngine<Block>>>,
validator: Arc<PotGossipValidator>,
validator: Arc<PotGossipValidator<Block>>,
pot_state: Arc<dyn PotProtocolState>,
client: Arc<Client>,
topic: Block::Hash,
outgoing_messages_sender: mpsc::Sender<PotProof>,
outgoing_messages_receiver: mpsc::Receiver<PotProof>,
}

impl<Block: BlockT> PotGossip<Block> {
/// Creates the gossip components.
pub(crate) fn new<Network, GossipSync>(
impl<Block, Client> PotGossipWorker<Block, Client>
where
Block: BlockT,
Client: HeaderBackend<Block> + BlockchainEvents<Block>,
{
/// Instantiate gossip worker
pub fn new<Network, GossipSync>(
components: &PotComponents,
client: Arc<Client>,
network: Network,
sync: Arc<GossipSync>,
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
) -> Self
where
Network: sc_network_gossip::Network<Block> + Send + Sync + Clone + 'static,
GossipSync: GossipSyncing<Block> + 'static,
{
let validator = Arc::new(PotGossipValidator::new(pot_state, proof_of_time));
let topic =
<<Block::Header as HeaderT>::Hashing as HashT>::hash(b"subspace-proof-of-time-gossip");

let validator = Arc::new(PotGossipValidator::new(
Arc::clone(&components.protocol_state),
components.proof_of_time,
topic,
));
let engine = Arc::new(Mutex::new(GossipEngine::new(
network,
sync,
GOSSIP_PROTOCOL,
validator.clone(),
None,
)));
Self { engine, validator }

let (outgoing_messages_sender, outgoing_messages_receiver) = mpsc::channel(0);

Self {
engine,
validator,
pot_state: Arc::clone(&components.protocol_state),
client,
topic,
outgoing_messages_sender,
outgoing_messages_receiver,
}
}

/// Gossips the message to the network.
pub(crate) fn gossip_message(&self, message: Vec<u8>) {
self.validator.on_broadcast(&message);
self.engine
.lock()
.gossip_message(topic::<Block>(), message, false);
/// Sender that can be used to gossip PoT messages to the network
pub fn gossip_sender(&self) -> mpsc::Sender<PotProof> {
self.outgoing_messages_sender.clone()
}

/// Runs the loop to process incoming messages.
/// Returns when the gossip engine terminates.
pub(crate) async fn process_incoming_messages<'a>(
&self,
process_fn: Arc<dyn Fn(PeerId, PotProof) + Send + Sync + 'a>,
) {
let message_receiver = self.engine.lock().messages_for(topic::<Block>());
/// Run gossip engine.
///
/// NOTE: Even though this function is async, it might do blocking operations internally and
/// should be running on a dedicated thread.
pub async fn run(mut self) {
self.initialize().await;

let message_receiver = self.engine.lock().messages_for(self.topic);
let mut incoming_messages = Box::pin(message_receiver.filter_map(
// Filter out messages without sender or fail to decode.
// TODO: penalize nodes that send garbled messages.
Expand All @@ -85,8 +119,11 @@ impl<Block: BlockT> PotGossip<Block> {
futures::select! {
gossiped = incoming_messages.next().fuse() => {
if let Some((sender, proof)) = gossiped {
(process_fn)(sender, proof);
self.handle_incoming_message(sender, proof);
}
},
outgoing_message = self.outgoing_messages_receiver.select_next_some() => {
self.handle_outgoing_message(outgoing_message)
},
_ = gossip_engine_poll.fuse() => {
error!("Gossip engine has terminated");
Expand All @@ -95,22 +132,102 @@ impl<Block: BlockT> PotGossip<Block> {
}
}
}

/// Initializes the chain state from the consensus tip info.
async fn initialize(&self) {
debug!("Waiting for initialization");

// Wait for a block with proofs.
let mut block_import = self.client.import_notification_stream();
while let Some(incoming_block) = block_import.next().await {
let pre_digest = match extract_pre_digest(&incoming_block.header) {
Ok(pre_digest) => pre_digest,
Err(error) => {
warn!(
%error,
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pre_digest",
);
continue;
}
};

let pot_pre_digest = match pre_digest.pot_pre_digest() {
Some(pot_pre_digest) => pot_pre_digest,
None => {
warn!(
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
"Failed to get pot_pre_digest",
);
continue;
}
};

if pot_pre_digest.proofs().is_some() {
trace!(
block_hash = %incoming_block.hash,
origin = ?incoming_block.origin,
?pot_pre_digest,
"Initialization complete",
);
return;
}
}
}

/// Handles the incoming gossip message.
fn handle_incoming_message(&self, sender: PeerId, proof: PotProof) {
let start_ts = Instant::now();
let ret = self.pot_state.on_proof_from_peer(sender, &proof);
let elapsed = start_ts.elapsed();

if let Err(error) = ret {
trace!(%error, %sender, "On gossip");
} else {
trace!(%proof, ?elapsed, %sender, "On gossip");
self.engine
.lock()
.gossip_message(self.topic, proof.encode(), false);
}
}

fn handle_outgoing_message(&self, proof: PotProof) {
let message = proof.encode();
self.validator.on_broadcast(&message);
self.engine
.lock()
.gossip_message(self.topic, message, false);
}
}

/// Validator for gossiped messages
struct PotGossipValidator {
struct PotGossipValidator<Block>
where
Block: BlockT,
{
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
pending: RwLock<HashSet<Blake2b256Hash>>,
topic: Block::Hash,
}

impl PotGossipValidator {
impl<Block> PotGossipValidator<Block>
where
Block: BlockT,
{
/// Creates the validator.
fn new(pot_state: Arc<dyn PotProtocolState>, proof_of_time: ProofOfTime) -> Self {
fn new(
pot_state: Arc<dyn PotProtocolState>,
proof_of_time: ProofOfTime,
topic: Block::Hash,
) -> Self {
Self {
pot_state,
proof_of_time,
pending: RwLock::new(HashSet::new()),
topic,
}
}

Expand All @@ -121,7 +238,10 @@ impl PotGossipValidator {
}
}

impl<Block: BlockT> Validator<Block> for PotGossipValidator {
impl<Block> Validator<Block> for PotGossipValidator<Block>
where
Block: BlockT,
{
fn validate(
&self,
_context: &mut dyn ValidatorContext<Block>,
Expand All @@ -138,7 +258,7 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
trace!(%error, "Verification failed");
ValidationResult::Discard
} else {
ValidationResult::ProcessAndKeep(topic::<Block>())
ValidationResult::ProcessAndKeep(self.topic)
}
}
Err(_) => ValidationResult::Discard,
Expand All @@ -162,11 +282,6 @@ impl<Block: BlockT> Validator<Block> for PotGossipValidator {
}
}

/// PoT message topic.
fn topic<Block: BlockT>() -> Block::Hash {
<<Block::Header as HeaderT>::Hashing as HashT>::hash(b"subspace-proof-of-time-gossip")
}

/// Returns the network configuration for PoT gossip.
pub fn pot_gossip_peers_set_config() -> NonDefaultSetConfig {
let mut cfg = NonDefaultSetConfig::new(GOSSIP_PROTOCOL.into(), 5 * 1024 * 1024);
Expand Down
7 changes: 2 additions & 5 deletions crates/sc-proof-of-time/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,7 @@

#![feature(const_option)]

mod gossip;
mod node_client;
pub mod gossip;
mod state_manager;
mod time_keeper;

Expand All @@ -13,8 +12,6 @@ use std::sync::Arc;
use subspace_core_primitives::{BlockNumber, SlotNumber};
use subspace_proof_of_time::ProofOfTime;

pub use gossip::pot_gossip_peers_set_config;
pub use node_client::PotClient;
pub use state_manager::{
PotConsensusState, PotGetBlockProofsError, PotStateSummary, PotVerifyBlockProofsError,
};
Expand Down Expand Up @@ -87,7 +84,7 @@ impl PotComponents {
let proof_of_time = ProofOfTime::new(config.pot_iterations, config.num_checkpoints)
// TODO: Proper error handling or proof
.expect("Failed to initialize proof of time");
let (protocol_state, consensus_state) = init_pot_state(config, proof_of_time.clone());
let (protocol_state, consensus_state) = init_pot_state(config, proof_of_time);

Self {
is_time_keeper,
Expand Down
Loading

0 comments on commit f782f72

Please sign in to comment.