Skip to content

Commit

Permalink
Add integration skeleton of parallel-executor (#2617)
Browse files Browse the repository at this point in the history
## Description

This PR is the first one of a sequel of PR that will introduce the
`parallel-executor` to the codebase.

The goal of this first PR is to create the new module and showcase the
smooth integration with the existing codebase.

The instantiation of the `parallel-executor` has been commented to pass
the tests with `--all-features` however I want to keep them to showcase
what the integration looks like and will be uncommented when real code
exists.

## Checklist
- [x] Breaking changes are clearly marked as such in the PR description
and changelog
- [x] New behavior is reflected in tests
- [x] [The specification](https://github.com/FuelLabs/fuel-specs/)
matches the implemented behavior (link update PR if changes are needed)

### Before requesting review
- [x] I have reviewed the code myself
- [x] I have created follow-up issues caused by this PR and linked them
here
  • Loading branch information
AurelienFT authored Feb 3, 2025
1 parent 1e852a3 commit b501db2
Show file tree
Hide file tree
Showing 15 changed files with 213 additions and 5 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- [2643](https://github.com/FuelLabs/fuel-core/pull/2643): Before this fix when tip is zero, transactions that use 30M have the same priority as transactions with 1M gas. Now they are correctly ordered.

### Added
- [2617](https://github.com/FuelLabs/fuel-core/pull/2617): Add integration skeleton of parallel-executor.
- [2553](https://github.com/FuelLabs/fuel-core/pull/2553): Scaffold global merkle root storage crate.
- [2598](https://github.com/FuelLabs/fuel-core/pull/2598): Add initial test suite for global merkle root storage updates.

Expand Down
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ members = [
"crates/services/gas_price_service",
"crates/services/importer",
"crates/services/p2p",
"crates/services/parallel-executor",
"crates/services/producer",
"crates/services/relayer",
"crates/services/shared-sequencer",
Expand Down Expand Up @@ -81,6 +82,7 @@ fuel-core-executor = { version = "0.41.4", path = "./crates/services/executor",
fuel-core-importer = { version = "0.41.4", path = "./crates/services/importer" }
fuel-core-gas-price-service = { version = "0.41.4", path = "crates/services/gas_price_service" }
fuel-core-p2p = { version = "0.41.4", path = "./crates/services/p2p" }
fuel-core-parallel-executor = { version = "0.41.4", path = "./crates/services/parallel-executor" }
fuel-core-producer = { version = "0.41.4", path = "./crates/services/producer" }
fuel-core-relayer = { version = "0.41.4", path = "./crates/services/relayer" }
fuel-core-sync = { version = "0.41.4", path = "./crates/services/sync" }
Expand Down
1 change: 1 addition & 0 deletions bin/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,4 @@ production = [
"aws-kms",
]
fault-proving = ["fuel-core-compression/fault-proving"]
parallel-executor = ["fuel-core/parallel-executor"]
12 changes: 12 additions & 0 deletions bin/fuel-core/src/cli/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ use url::Url;
#[cfg(feature = "rocksdb")]
use fuel_core::state::historical_rocksdb::StateRewindPolicy;

#[cfg(feature = "parallel-executor")]
use std::num::NonZeroUsize;

#[cfg(feature = "p2p")]
mod p2p;

Expand Down Expand Up @@ -196,6 +199,11 @@ pub struct Command {
#[arg(long = "native-executor-version", env)]
pub native_executor_version: Option<StateTransitionBytecodeVersion>,

/// Number of cores to use for the parallel executor.
#[cfg(feature = "parallel-executor")]
#[arg(long = "executor-number-of-cores", env, default_value = "1")]
pub executor_number_of_cores: NonZeroUsize,

/// The starting execution gas price for the network
#[cfg_attr(
feature = "production",
Expand Down Expand Up @@ -372,6 +380,8 @@ impl Command {
debug,
utxo_validation,
native_executor_version,
#[cfg(feature = "parallel-executor")]
executor_number_of_cores,
starting_gas_price,
gas_price_change_percent,
min_gas_price,
Expand Down Expand Up @@ -664,6 +674,8 @@ impl Command {
native_executor_version,
continue_on_error,
utxo_validation,
#[cfg(feature = "parallel-executor")]
executor_number_of_cores,
block_production: trigger,
predefined_blocks_path,
vm: VMConfig {
Expand Down
2 changes: 2 additions & 0 deletions crates/fuel-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ fuel-core-gas-price-service = { workspace = true }
fuel-core-importer = { workspace = true }
fuel-core-metrics = { workspace = true }
fuel-core-p2p = { workspace = true, optional = true }
fuel-core-parallel-executor = { workspace = true, optional = true }
fuel-core-poa = { workspace = true }
fuel-core-producer = { workspace = true }
fuel-core-relayer = { workspace = true, optional = true }
Expand Down Expand Up @@ -118,3 +119,4 @@ test-helpers = [
rocksdb-production = ["rocksdb", "rocksdb/jemalloc"]
wasm-executor = ["fuel-core-upgradable-executor/wasm-executor"]
fault-proving = ["fuel-core-compression/fault-proving"]
parallel-executor = ["fuel-core-parallel-executor"]
3 changes: 3 additions & 0 deletions crates/fuel-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ pub use fuel_core_chain_config as chain_config;
#[cfg(feature = "p2p")]
#[doc(no_inline)]
pub use fuel_core_p2p as p2p;
#[cfg(feature = "parallel-executor")]
#[doc(no_inline)]
pub use fuel_core_parallel_executor as parallel_executor;
#[doc(no_inline)]
pub use fuel_core_producer as producer;
#[cfg(feature = "relayer")]
Expand Down
6 changes: 6 additions & 0 deletions crates/fuel-core/src/service/adapters.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ use fuel_core_consensus_module::{
use fuel_core_executor::executor::OnceTransactionsSource;
use fuel_core_gas_price_service::v1::service::LatestGasPrice;
use fuel_core_importer::ImporterResult;
// #[cfg(feature = "parallel-executor")]
// use fuel_core_parallel_executor::executor::Executor;
use fuel_core_poa::ports::BlockSigner;
use fuel_core_services::stream::BoxStream;
use fuel_core_storage::transactional::Changes;
Expand Down Expand Up @@ -49,6 +51,7 @@ use fuel_core_types::{
signer::SignMode,
tai64::Tai64,
};
//#[cfg(not(feature = "parallel-executor"))]
use fuel_core_upgradable_executor::executor::Executor;
use std::sync::Arc;

Expand Down Expand Up @@ -346,6 +349,9 @@ impl ExecutorAdapter {
pub fn new(
database: Database,
relayer_database: Database<Relayer>,
// #[cfg(feature = "parallel-executor")]
// config: fuel_core_parallel_executor::config::Config,
// #[cfg(not(feature = "parallel-executor"))]
config: fuel_core_upgradable_executor::config::Config,
) -> Self {
let executor = Executor::new(database, relayer_database, config);
Expand Down
7 changes: 7 additions & 0 deletions crates/fuel-core/src/service/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ use crate::{
},
};

#[cfg(feature = "parallel-executor")]
use std::num::NonZeroUsize;

#[derive(Clone, Debug)]
pub struct Config {
pub graphql_config: GraphQLConfig,
Expand All @@ -54,6 +57,8 @@ pub struct Config {
// default to false until downstream consumers stabilize
pub utxo_validation: bool,
pub native_executor_version: Option<StateTransitionBytecodeVersion>,
#[cfg(feature = "parallel-executor")]
pub executor_number_of_cores: NonZeroUsize,
pub block_production: Trigger,
pub predefined_blocks_path: Option<PathBuf>,
pub vm: VMConfig,
Expand Down Expand Up @@ -175,6 +180,8 @@ impl Config {
debug: true,
utxo_validation,
native_executor_version: Some(native_executor_version),
#[cfg(feature = "parallel-executor")]
executor_number_of_cores: NonZeroUsize::new(1).expect("1 is not zero"),
snapshot_reader,
block_production: Trigger::Instant,
predefined_blocks_path: None,
Expand Down
17 changes: 12 additions & 5 deletions crates/fuel-core/src/service/sub_services.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,14 +102,21 @@ pub fn init_sub_services(

let last_height = *last_block_header.height();

let upgradable_executor_config = fuel_core_upgradable_executor::config::Config {
backtrace: config.vm.backtrace,
utxo_validation_default: config.utxo_validation,
native_executor_version: config.native_executor_version,
};
let executor = ExecutorAdapter::new(
database.on_chain().clone(),
database.relayer().clone(),
fuel_core_upgradable_executor::config::Config {
backtrace: config.vm.backtrace,
utxo_validation_default: config.utxo_validation,
native_executor_version: config.native_executor_version,
},
// #[cfg(not(feature = "parallel-executor"))]
upgradable_executor_config,
// #[cfg(feature = "parallel-executor")]
// fuel_core_parallel_executor::config::Config {
// number_of_cores: config.executor_number_of_cores,
// executor_config: upgradable_executor_config,
// },
);
let import_result_provider =
ImportResultProvider::new(database.on_chain().clone(), executor.clone());
Expand Down
19 changes: 19 additions & 0 deletions crates/services/parallel-executor/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "fuel-core-parallel-executor"
version = { workspace = true }
authors = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = ["blockchain", "fuel", "fuel-vm", "parallel"]
license = { workspace = true }
repository = { workspace = true }
description = "Fuel Block Parallel Executor"

[dependencies]
fuel-core-storage = { workspace = true, features = ["std"] }
fuel-core-types = { workspace = true, features = ["std"] }
fuel-core-upgradable-executor = { workspace = true, features = ["std"] }
tokio = { workspace = true, features = ["rt-multi-thread"] }

[features]
wasm-executor = ["fuel-core-upgradable-executor/wasm-executor"]
19 changes: 19 additions & 0 deletions crates/services/parallel-executor/src/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use fuel_core_upgradable_executor::config::Config as ExecutorConfig;
use std::num::NonZeroUsize;

#[derive(Clone, Debug)]
pub struct Config {
/// The number of cores to use for the block execution.
pub number_of_cores: NonZeroUsize,
/// See [`fuel_core_upgradable_executor::config::Config`].
pub executor_config: ExecutorConfig,
}

impl Default for Config {
fn default() -> Self {
Self {
number_of_cores: NonZeroUsize::new(1).expect("The value is not zero; qed"),
executor_config: Default::default(),
}
}
}
114 changes: 114 additions & 0 deletions crates/services/parallel-executor/src/executor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
use crate::config::Config;
use fuel_core_storage::transactional::Changes;
use fuel_core_types::{
blockchain::block::Block,
fuel_tx::Transaction,
services::{
block_producer::Components,
executor::{
ExecutionResult,
Result as ExecutorResult,
TransactionExecutionStatus,
ValidationResult,
},
Uncommitted,
},
};
use fuel_core_upgradable_executor::{
executor::Executor as UpgradableExecutor,
native_executor::ports::TransactionsSource,
};
use std::{
num::NonZeroUsize,
sync::{
Arc,
RwLock,
},
};
use tokio::runtime::Runtime;

#[cfg(feature = "wasm-executor")]
use fuel_core_upgradable_executor::error::UpgradableError;

#[cfg(feature = "wasm-executor")]
use fuel_core_types::fuel_merkle::common::Bytes32;

pub struct Executor<S, R> {
_executor: Arc<RwLock<UpgradableExecutor<S, R>>>,
runtime: Option<Runtime>,
_number_of_cores: NonZeroUsize,
}

// Shutdown the tokio runtime to avoid panic if executor is already
// used from another tokio runtime
impl<S, R> Drop for Executor<S, R> {
fn drop(&mut self) {
if let Some(runtime) = self.runtime.take() {
runtime.shutdown_background();
}
}
}

impl<S, R> Executor<S, R> {
pub fn new(
storage_view_provider: S,
relayer_view_provider: R,
config: Config,
) -> Self {
let executor = UpgradableExecutor::new(
storage_view_provider,
relayer_view_provider,
config.executor_config,
);
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(config.number_of_cores.get())
.enable_all()
.build()
.unwrap();
let number_of_cores = config.number_of_cores;

Self {
_executor: Arc::new(RwLock::new(executor)),
runtime: Some(runtime),
_number_of_cores: number_of_cores,
}
}
}

impl<S, R> Executor<S, R> {
/// Produces the block and returns the result of the execution without committing the changes.
pub fn produce_without_commit_with_source<TxSource>(
&self,
_components: Components<TxSource>,
) -> ExecutorResult<Uncommitted<ExecutionResult, Changes>>
where
TxSource: TransactionsSource + Send + Sync + 'static,
{
unimplemented!("Not implemented yet");
}

pub fn validate(
&self,
_block: &Block,
) -> ExecutorResult<Uncommitted<ValidationResult, Changes>> {
unimplemented!("Not implemented yet");
}

#[cfg(feature = "wasm-executor")]
pub fn validate_uploaded_wasm(
&self,
_wasm_root: &Bytes32,
) -> Result<(), UpgradableError> {
unimplemented!("Not implemented yet");
}

/// Executes the block and returns the result of the execution without committing
/// the changes in the dry run mode.
pub fn dry_run(
&self,
_component: Components<Vec<Transaction>>,
_utxo_validation: Option<bool>,
) -> ExecutorResult<Vec<TransactionExecutionStatus>> {
unimplemented!("Not implemented yet");
}
}
2 changes: 2 additions & 0 deletions crates/services/parallel-executor/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod config;
pub mod executor;
2 changes: 2 additions & 0 deletions crates/services/upgradable-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ pub mod config;
pub mod error;
pub mod executor;

pub use fuel_core_executor as native_executor;

#[cfg(feature = "wasm-executor")]
pub mod instance;

Expand Down

0 comments on commit b501db2

Please sign in to comment.