From 89acec2212b9e098517bc9ae7725a095a8d52a67 Mon Sep 17 00:00:00 2001 From: nicolas <48695862+merklefruit@users.noreply.github.com> Date: Sun, 22 Sep 2024 21:25:23 +0200 Subject: [PATCH] feat(rollup): engine api rpc --- .gitignore | 3 + Cargo.lock | 120 +++++++-- Cargo.toml | 6 +- bin/op-rs/src/main.rs | 2 +- crates/net/src/discovery/driver.rs | 8 +- crates/net/src/driver.rs | 1 + crates/net/src/gossip/driver.rs | 8 + crates/rollup/Cargo.toml | 6 + crates/rollup/src/driver/context/mod.rs | 9 +- .../rollup/src/driver/context/standalone.rs | 24 +- crates/rollup/src/driver/mod.rs | 39 +-- crates/rollup/src/engine/mod.rs | 148 +++++++++++ crates/rollup/src/engine/transport.rs | 241 ++++++++++++++++++ crates/rollup/src/lib.rs | 3 + crates/rollup/src/validator.rs | 71 ++---- 15 files changed, 583 insertions(+), 106 deletions(-) create mode 100644 crates/rollup/src/engine/mod.rs create mode 100644 crates/rollup/src/engine/transport.rs diff --git a/.gitignore b/.gitignore index 3b7fd9c..601673c 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,6 @@ lcov.info # Rust bug report rustc-ice-* + +# Used for testing +jwt.hex diff --git a/Cargo.lock b/Cargo.lock index 9c73094..9fc1e6f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,7 @@ dependencies = [ "alloy-core", "alloy-eips", "alloy-genesis", + "alloy-json-rpc", "alloy-network", "alloy-provider", "alloy-pubsub", @@ -4007,10 +4008,10 @@ dependencies = [ "kona-primitives", "lru", "miniz_oxide", - "op-alloy-consensus", - "op-alloy-genesis", - "op-alloy-protocol", - "op-alloy-rpc-types-engine", + "op-alloy-consensus 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-protocol 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-rpc-types-engine 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "reqwest", "serde", "thiserror 1.0.63 (git+https://github.com/quartiq/thiserror?branch=no-std)", @@ -4029,9 +4030,9 @@ dependencies = [ "alloy-rlp", "anyhow", "c-kzg", - "op-alloy-consensus", - "op-alloy-genesis", - "op-alloy-protocol", + "op-alloy-consensus 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-protocol 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "revm", "serde", "sha2 0.10.8", @@ -4050,7 +4051,7 @@ dependencies = [ "hashbrown 0.14.5", "kona-derive", "kona-primitives", - "op-alloy-protocol", + "op-alloy-protocol 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "parking_lot 0.12.3", "reth", "tracing", @@ -5194,6 +5195,21 @@ dependencies = [ "spin 0.9.8", ] +[[package]] +name = "op-alloy-consensus" +version = "0.2.12" +source = "git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72#f6d9d723fff605c8b87a95548d4eb56338dc2c1e" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "alloy-serde", + "derive_more", + "serde", + "spin 0.9.8", +] + [[package]] name = "op-alloy-genesis" version = "0.2.12" @@ -5208,6 +5224,19 @@ dependencies = [ "serde_repr", ] +[[package]] +name = "op-alloy-genesis" +version = "0.2.12" +source = "git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72#f6d9d723fff605c8b87a95548d4eb56338dc2c1e" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-sol-types", + "serde", + "serde_repr", +] + [[package]] name = "op-alloy-protocol" version = "0.2.12" @@ -5220,11 +5249,41 @@ dependencies = [ "alloy-rlp", "alloy-serde", "hashbrown 0.14.5", - "op-alloy-consensus", - "op-alloy-genesis", + "op-alloy-consensus 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "serde", ] +[[package]] +name = "op-alloy-protocol" +version = "0.2.12" +source = "git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72#f6d9d723fff605c8b87a95548d4eb56338dc2c1e" +dependencies = [ + "alloy-consensus", + "alloy-eips", + "alloy-primitives", + "alloy-rlp", + "alloy-serde", + "hashbrown 0.14.5", + "op-alloy-consensus 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", + "op-alloy-genesis 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", + "serde", +] + +[[package]] +name = "op-alloy-provider" +version = "0.2.12" +source = "git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72#f6d9d723fff605c8b87a95548d4eb56338dc2c1e" +dependencies = [ + "alloy-network", + "alloy-primitives", + "alloy-provider", + "alloy-rpc-types-engine", + "alloy-transport", + "async-trait", + "op-alloy-rpc-types-engine 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", +] + [[package]] name = "op-alloy-rpc-types" version = "0.2.12" @@ -5238,7 +5297,7 @@ dependencies = [ "alloy-serde", "cfg-if", "hashbrown 0.14.5", - "op-alloy-consensus", + "op-alloy-consensus 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "serde", "serde_json", ] @@ -5254,9 +5313,25 @@ dependencies = [ "alloy-rpc-types-engine", "alloy-serde", "derive_more", - "op-alloy-consensus", - "op-alloy-genesis", - "op-alloy-protocol", + "op-alloy-consensus 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-protocol 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "serde", +] + +[[package]] +name = "op-alloy-rpc-types-engine" +version = "0.2.12" +source = "git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72#f6d9d723fff605c8b87a95548d4eb56338dc2c1e" +dependencies = [ + "alloy-eips", + "alloy-primitives", + "alloy-rpc-types-engine", + "alloy-serde", + "derive_more", + "op-alloy-consensus 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", + "op-alloy-genesis 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", + "op-alloy-protocol 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", "serde", ] @@ -8072,7 +8147,7 @@ dependencies = [ "alloy-serde", "jsonrpsee-types", "op-alloy-rpc-types", - "op-alloy-rpc-types-engine", + "op-alloy-rpc-types-engine 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", ] [[package]] @@ -8586,9 +8661,10 @@ dependencies = [ "kona-derive", "kona-providers", "metrics-exporter-prometheus", - "op-alloy-genesis", - "op-alloy-protocol", - "op-alloy-rpc-types-engine", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-protocol 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-provider", + "op-alloy-rpc-types-engine 0.2.12 (git+https://github.com/alloy-rs/op-alloy?rev=f6d9d72)", "reqwest", "reth", "reth-execution-types", @@ -8596,7 +8672,9 @@ dependencies = [ "reth-node-api", "serde_json", "superchain", + "thiserror 1.0.63 (registry+https://github.com/rust-lang/crates.io-index)", "tokio", + "tower 0.4.13", "tracing", "tracing-subscriber", "url", @@ -8999,8 +9077,8 @@ dependencies = [ "alloy-rlp", "eyre", "kona-derive", - "op-alloy-genesis", - "op-alloy-protocol", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", + "op-alloy-protocol 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "op-alloy-rpc-types", "rand", "tracing", @@ -9432,7 +9510,7 @@ dependencies = [ "alloy-primitives", "hashbrown 0.14.5", "lazy_static", - "op-alloy-genesis", + "op-alloy-genesis 0.2.12 (registry+https://github.com/rust-lang/crates.io-index)", "serde", "serde_json", ] diff --git a/Cargo.toml b/Cargo.toml index 1ef0972..4da6bcb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -53,6 +53,7 @@ alloy = { version = "0.3.6", features = [ "consensus", "rpc-types", "rpc-types-engine", + "json-rpc", "network", "ssz" ] } @@ -60,8 +61,10 @@ alloy-primitives = { version = "0.8", features = ["serde"] } alloy-rlp = "0.3" op-alloy-protocol = { version = "0.2.12", default-features = false } op-alloy-rpc-types = { version = "0.2.12", default-features = false } -op-alloy-rpc-types-engine = { version = "0.2.12", default-features = false } op-alloy-genesis = { version = "0.2.12", default-features = false } +# TODO: Use a new version once it's released +op-alloy-rpc-types-engine = { git = "https://github.com/alloy-rs/op-alloy", rev = "f6d9d72", default-features = false } +op-alloy-provider = { git = "https://github.com/alloy-rs/op-alloy", rev = "f6d9d72", default-features = false } # Tokio tokio = { version = "1.21", default-features = false } @@ -110,6 +113,7 @@ hashbrown = "0.14.5" parking_lot = "0.12.3" unsigned-varint = "0.8.0" rand = { version = "0.8.3", features = ["small_rng"], default-features = false } +thiserror = "1.0.63" url = "2.5.2" [workspace.metadata.docs.rs] diff --git a/bin/op-rs/src/main.rs b/bin/op-rs/src/main.rs index c2ad6b9..4416e20 100644 --- a/bin/op-rs/src/main.rs +++ b/bin/op-rs/src/main.rs @@ -38,7 +38,7 @@ fn main() -> Result<()> { let cfg = hera_args.get_l2_config()?; let node = EthereumNode::default(); - let hera = move |ctx| async { Ok(Driver::exex(ctx, hera_args, cfg).start()) }; + let hera = move |ctx| async { Ok(Driver::exex(ctx, hera_args, cfg).await.start()) }; let handle = builder.node(node).install_exex(HERA_EXEX_ID, hera).launch().await?; handle.wait_for_node_exit().await } else { diff --git a/crates/net/src/discovery/driver.rs b/crates/net/src/discovery/driver.rs index 489ca41..f5b9bd8 100644 --- a/crates/net/src/discovery/driver.rs +++ b/crates/net/src/discovery/driver.rs @@ -1,7 +1,7 @@ //! Discovery Module. use eyre::Result; -use std::time::Duration; +use std::{fmt::Debug, time::Duration}; use tokio::{ sync::mpsc::{channel, Receiver}, time::sleep, @@ -26,6 +26,12 @@ pub struct DiscoveryDriver { pub chain_id: u64, } +impl Debug for DiscoveryDriver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DiscoveryDriver").field("chain_id", &self.chain_id).finish() + } +} + impl DiscoveryDriver { /// Returns a new [DiscoveryBuilder] instance. pub fn builder() -> DiscoveryBuilder { diff --git a/crates/net/src/driver.rs b/crates/net/src/driver.rs index 191312f..ea4b135 100644 --- a/crates/net/src/driver.rs +++ b/crates/net/src/driver.rs @@ -15,6 +15,7 @@ use tokio::{select, sync::watch}; /// There are two core services that are run by the driver: /// - Block gossip through Gossipsub. /// - Peer discovery with `discv5`. +#[derive(Debug)] pub struct NetworkDriver { /// Channel to receive unsafe blocks. pub(crate) unsafe_block_recv: Option>, diff --git a/crates/net/src/gossip/driver.rs b/crates/net/src/gossip/driver.rs index bd47a4a..2ad84a3 100644 --- a/crates/net/src/gossip/driver.rs +++ b/crates/net/src/gossip/driver.rs @@ -1,5 +1,7 @@ //! Consensus-layer gossipsub driver for Optimism. +use std::fmt::Debug; + use crate::gossip::{ behaviour::Behaviour, event::Event, @@ -20,6 +22,12 @@ pub struct GossipDriver { pub handler: BlockHandler, } +impl Debug for GossipDriver { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("GossipDriver").field("addr", &self.addr).finish() + } +} + impl GossipDriver { /// Creates a new [GossipDriver] instance. pub fn new(swarm: Swarm, addr: Multiaddr, handler: BlockHandler) -> Self { diff --git a/crates/rollup/Cargo.toml b/crates/rollup/Cargo.toml index 3e78d29..85a3f33 100644 --- a/crates/rollup/Cargo.toml +++ b/crates/rollup/Cargo.toml @@ -10,6 +10,7 @@ keywords.workspace = true categories.workspace = true [dependencies] +# Workspace kona-providers.workspace = true # OP Stack Dependencies @@ -17,6 +18,7 @@ kona-derive.workspace = true op-alloy-genesis.workspace = true op-alloy-protocol.workspace = true op-alloy-rpc-types-engine.workspace = true +op-alloy-provider.workspace = true superchain = { workspace = true, default-features = false } # Reth Dependencies @@ -29,6 +31,9 @@ reth-execution-types.workspace = true tracing-subscriber = { version = "0.3.18", features = ["env-filter", "fmt"] } metrics-exporter-prometheus = { version = "0.15.3", features = ["http-listener"] } +# Async +tower = "0.4" + # Misc url.workspace = true reqwest.workspace = true @@ -41,6 +46,7 @@ tokio.workspace = true futures.workspace = true alloy.workspace = true hashbrown.workspace = true +thiserror.workspace = true [features] default = ["online"] diff --git a/crates/rollup/src/driver/context/mod.rs b/crates/rollup/src/driver/context/mod.rs index 3492f62..786436c 100644 --- a/crates/rollup/src/driver/context/mod.rs +++ b/crates/rollup/src/driver/context/mod.rs @@ -79,8 +79,13 @@ impl Blocks { *self.0.last_key_value().expect("Blocks should have at least one block").0 } - /// Returns the block at the fork point of the chain. - pub fn fork_block(&self) -> BlockNumber { + /// Returns a reference to the block at the tip of the chain. + pub fn tip_block(&self) -> &Block { + self.0.get(&self.tip()).expect("Blocks should have at least one block") + } + + /// Returns the block number at the fork point of the chain (the block before the first). + pub fn fork_block_number(&self) -> BlockNumber { let first = self.0.first_key_value().expect("Blocks should have at least one block").0; first.saturating_sub(1) } diff --git a/crates/rollup/src/driver/context/standalone.rs b/crates/rollup/src/driver/context/standalone.rs index 269f537..4781c0f 100644 --- a/crates/rollup/src/driver/context/standalone.rs +++ b/crates/rollup/src/driver/context/standalone.rs @@ -17,7 +17,7 @@ use tokio::{ sync::mpsc::{self, error::SendError}, task::JoinHandle, }; -use tracing::{debug, error, warn}; +use tracing::{error, warn}; use url::Url; use super::{Blocks, ChainNotification, DriverContext}; @@ -51,17 +51,11 @@ pub struct StandaloneHeraContext { impl StandaloneHeraContext { /// Create a new standalone context that polls for new chains. pub async fn new(l1_rpc_url: Url) -> TransportResult { - if l1_rpc_url.scheme().contains("http") { - debug!("Polling for new blocks via HTTP"); - Self::with_http_poller(l1_rpc_url).await - } else if l1_rpc_url.scheme().contains("ws") { - debug!("Subscribing to new blocks via websocket"); - Self::with_ws_subscriber(l1_rpc_url).await - } else if l1_rpc_url.scheme().contains("file") { - debug!("Subscribing to new blocks via IPC"); - Self::with_ipc_subscriber(l1_rpc_url).await - } else { - Err(TransportErrorKind::custom_str("Unsupported URL scheme")) + match l1_rpc_url.scheme() { + "http" | "https" => Self::with_http_poller(l1_rpc_url).await, + "ws" | "wss" => Self::with_ws_subscriber(l1_rpc_url).await, + "file" => Self::with_ipc_subscriber(l1_rpc_url).await, + _ => Err(TransportErrorKind::custom_str("Unsupported URL scheme")), } } @@ -197,10 +191,6 @@ impl StandaloneHeraContext { #[async_trait] impl DriverContext for StandaloneHeraContext { async fn recv_notification(&mut self) -> Option { - // TODO: is it ok to skip fetching full txs and receipts here assuming the node will - // have a fallback online RPC for that downstream? The driver and provider should be - // generic but currently are very coupled to the node mode (standalone vs exex). - let block = self.new_block_rx.recv().await?; let block_num = block.header.number; @@ -208,7 +198,7 @@ impl DriverContext for StandaloneHeraContext { entry.insert(block.header.hash, block.clone()); if block_num <= self.l1_tip { - todo!("handle reorgs"); + todo!("handle L1 reorgs"); } else { self.l1_tip = block_num; diff --git a/crates/rollup/src/driver/mod.rs b/crates/rollup/src/driver/mod.rs index 82d4fa9..711b440 100644 --- a/crates/rollup/src/driver/mod.rs +++ b/crates/rollup/src/driver/mod.rs @@ -53,7 +53,7 @@ pub struct Driver { impl Driver, InMemoryChainProvider, LayeredBlobProvider> { /// Create a new Hera Execution Extension Driver - pub fn exex(ctx: ExExContext, args: HeraArgsExt, cfg: Arc) -> Self { + pub async fn exex(ctx: ExExContext, args: HeraArgsExt, cfg: Arc) -> Self { let chain_provider = InMemoryChainProvider::with_capacity(args.l1_chain_cache_size); let blob_provider = LayeredBlobProvider::new( args.l1_beacon_client_url.clone(), @@ -65,7 +65,7 @@ impl Driver, InMemoryChainProvider, La // to the derivation pipeline's L1 chain provider. let exex_ctx = ExExHeraContext::new(ctx, chain_provider.clone()); - Self::with_components(exex_ctx, args, cfg, chain_provider, blob_provider) + Self::with_components(exex_ctx, args, cfg, chain_provider, blob_provider).await } } @@ -77,7 +77,7 @@ impl Driver { .with_primary(args.l1_beacon_client_url.as_str().trim_end_matches('/').to_string()) .with_fallback( args.l1_blob_archiver_url - .clone() + .as_ref() .map(|url| url.as_str().trim_end_matches('/').to_string()), ) .build(); @@ -87,7 +87,7 @@ impl Driver { // from the L1 chain provider directly. let standalone_ctx = StandaloneHeraContext::new(args.l1_rpc_url.clone()).await?; - Ok(Self::with_components(standalone_ctx, args, cfg, chain_provider, blob_provider)) + Ok(Self::with_components(standalone_ctx, args, cfg, chain_provider, blob_provider).await) } } @@ -98,30 +98,31 @@ where BP: BlobProvider + Clone + Send + Sync + Debug + 'static, { /// Create a new Hera Driver with the provided components. - fn with_components( + async fn with_components( ctx: DC, args: HeraArgsExt, cfg: Arc, l1_chain_provider: CP, blob_provider: BP, ) -> Self { - let cursor = SyncCursor::new(cfg.channel_timeout); let validator: Box = match args.validation_mode { - ValidationMode::Trusted => Box::new(TrustedValidator::new_http( - args.l2_rpc_url.clone(), - cfg.canyon_time.unwrap_or(0), - )), - ValidationMode::EngineApi => Box::new(EngineApiValidator::new_http( - args.l2_engine_api_url.expect("Missing L2 engine API URL"), - match args.l2_engine_jwt_secret.as_ref() { - Some(fpath) => JwtSecret::from_file(fpath).expect("Invalid L2 JWT secret file"), - None => panic!("Missing L2 engine JWT secret"), - }, - )), + ValidationMode::Trusted => { + let canyon_activation = cfg.canyon_time.unwrap_or(0); + Box::new(TrustedValidator::new_http(args.l2_rpc_url.clone(), canyon_activation)) + } + ValidationMode::EngineApi => { + let engine_url = args.l2_engine_api_url.expect("Missing L2 engine API URL"); + let jwt_secret_file = args.l2_engine_jwt_secret.expect("Missing L2 engine JWT"); + let jwt_secret = JwtSecret::from_file(&jwt_secret_file).expect("Invalid L2 JWT"); + let engine = EngineApiValidator::new(engine_url, jwt_secret).await; + Box::new(engine.expect("Failed to create Engine API validator")) + } }; + + let cursor = SyncCursor::new(cfg.channel_timeout); let l2_chain_provider = AlloyL2ChainProvider::new_http(args.l2_rpc_url, cfg.clone()); - Self { cfg, ctx, l1_chain_provider, blob_provider, l2_chain_provider, cursor, validator } + Self { cfg, ctx, l1_chain_provider, l2_chain_provider, blob_provider, cursor, validator } } /// Wait for the L2 genesis' corresponding L1 block to be available in the L1 chain. @@ -249,7 +250,7 @@ where // The reverted chain contains the list of blocks that were invalidated by the // reorg. we need to reset the cursor to the last canonical block, which corresponds // to the block before the reorg happened. - let fork_block = reverted_chain.fork_block(); + let fork_block = reverted_chain.fork_block_number(); // Find the last known L2 block that is still valid after the reorg, // and reset the cursor and pipeline to it. diff --git a/crates/rollup/src/engine/mod.rs b/crates/rollup/src/engine/mod.rs new file mode 100644 index 0000000..7afae49 --- /dev/null +++ b/crates/rollup/src/engine/mod.rs @@ -0,0 +1,148 @@ +use alloy::{ + network::AnyNetwork, + primitives::B256, + providers::RootProvider, + rpc::client::ClientBuilder, + transports::{BoxTransport, TransportErrorKind, TransportResult}, +}; +use op_alloy_provider::ext::engine::OpEngineApi; +use op_alloy_rpc_types_engine::{ + OptimismExecutionPayloadEnvelopeV3, OptimismExecutionPayloadEnvelopeV4, + OptimismPayloadAttributes, ProtocolVersion, SuperchainSignal, +}; +use reth::{ + payload::PayloadId, + rpc::types::{ + engine::{ + ExecutionPayloadEnvelopeV2, ExecutionPayloadInputV2, ForkchoiceState, + ForkchoiceUpdated, JwtSecret, PayloadStatus, + }, + ExecutionPayload, + }, +}; +use reth_node_api::EngineApiMessageVersion; +use url::Url; + +mod transport; +use transport::AuthenticatedTransportConnect; + +#[derive(Debug, Clone)] +pub struct EngineApiClient { + auth_provider: RootProvider, +} + +impl EngineApiClient { + /// Creates a new [`EngineApiClient`] instance. + pub async fn new(rpc_url: Url, jwt: JwtSecret) -> eyre::Result { + let auth_transport = AuthenticatedTransportConnect::new(rpc_url, jwt); + let client = ClientBuilder::default().connect_boxed(auth_transport).await?; + Ok(Self { auth_provider: RootProvider::<_, AnyNetwork>::new(client) }) + } + + /// Calls the `engine_newPayload` method on the engine API. + /// + /// - The version is determined by the execution payload version passed in. + /// - Returns the engine API message version and the payload status. + pub async fn new_payload( + &self, + payload: ExecutionPayload, + parent_beacon_block_root: Option, + ) -> TransportResult<(EngineApiMessageVersion, PayloadStatus)> { + match payload { + ExecutionPayload::V4(payload) => { + let parent_root = parent_beacon_block_root.ok_or_else(|| { + TransportErrorKind::custom_str("parent beacon root must be provided") + })?; + + Ok(( + EngineApiMessageVersion::V4, + self.auth_provider.new_payload_v4(payload, parent_root).await?, + )) + } + ExecutionPayload::V3(payload) => { + let parent_root = parent_beacon_block_root.ok_or_else(|| { + TransportErrorKind::custom_str("parent beacon root must be provided") + })?; + + Ok(( + EngineApiMessageVersion::V3, + self.auth_provider.new_payload_v3(payload, parent_root).await?, + )) + } + ExecutionPayload::V2(payload) => { + let input = ExecutionPayloadInputV2 { + execution_payload: payload.payload_inner, + withdrawals: Some(payload.withdrawals), + }; + + Ok((EngineApiMessageVersion::V2, self.auth_provider.new_payload_v2(input).await?)) + } + ExecutionPayload::V1(_) => { + Err(TransportErrorKind::custom_str("V1 payloads are not supported")) + } + } + } + + /// Calls the `engine_getPayload` method on the engine API. + /// + /// - The version is determined by the execution payload version passed in. + /// - Returns the execution payload envelope based on the version. + pub async fn get_payload( + &self, + message_version: EngineApiMessageVersion, + payload_id: PayloadId, + ) -> TransportResult { + match message_version { + EngineApiMessageVersion::V1 | EngineApiMessageVersion::V2 => { + Ok(GetPayloadResponse::V2(self.auth_provider.get_payload_v2(payload_id).await?)) + } + EngineApiMessageVersion::V3 => { + Ok(GetPayloadResponse::V3(self.auth_provider.get_payload_v3(payload_id).await?)) + } + EngineApiMessageVersion::V4 => { + Ok(GetPayloadResponse::V4(self.auth_provider.get_payload_v4(payload_id).await?)) + } + } + } + + /// Calls the `fork_choice_updated` method on the engine API. + /// + /// - The version is determined by the execution payload version passed in. + /// - Returns the fork choice updated response. + pub async fn fork_choice_updated( + &self, + message_version: EngineApiMessageVersion, + fork_choice_state: ForkchoiceState, + payload_attributes: Option, + ) -> TransportResult { + match message_version { + EngineApiMessageVersion::V4 | EngineApiMessageVersion::V3 => { + self.auth_provider + .fork_choice_updated_v3(fork_choice_state, payload_attributes) + .await + } + EngineApiMessageVersion::V1 | EngineApiMessageVersion::V2 => { + self.auth_provider + .fork_choice_updated_v2(fork_choice_state, payload_attributes) + .await + } + } + } + + /// Calls the `signal_superchain` method on the engine API. + /// + /// - Returns the latest protocol version supported by the execution engine. + pub async fn signal_superchain( + &self, + signal: SuperchainSignal, + ) -> TransportResult { + self.auth_provider.signal_superchain_v1(signal).await + } +} + +/// The response from the `engine_getPayload` versioned method. +pub enum GetPayloadResponse { + V2(ExecutionPayloadEnvelopeV2), + V3(OptimismExecutionPayloadEnvelopeV3), + V4(OptimismExecutionPayloadEnvelopeV4), +} diff --git a/crates/rollup/src/engine/transport.rs b/crates/rollup/src/engine/transport.rs new file mode 100644 index 0000000..c8fe31a --- /dev/null +++ b/crates/rollup/src/engine/transport.rs @@ -0,0 +1,241 @@ +use std::{ + sync::Arc, + task::{Context, Poll}, +}; + +use alloy::{ + providers::{IpcConnect, WsConnect}, + pubsub::{PubSubConnect, PubSubFrontend}, + rpc::json_rpc::{RequestPacket, ResponsePacket}, + transports::{ + http::{Http, ReqwestTransport}, + utils::guess_local_url, + Authorization, Pbf, TransportConnect, TransportError, TransportErrorKind, TransportFut, + }, +}; +use futures::FutureExt; +use reqwest::{ + header::{self, HeaderValue}, + Client, +}; +use reth::rpc::types::engine::{Claims, JwtSecret}; +use tokio::sync::RwLock; +use tower::Service; +use url::Url; + +/// An enum representing the different transports that can be used to connect to a runtime. +/// Only meant to be used internally by [`AuthenticatedTransport`]. +#[derive(Clone, Debug)] +pub enum InnerTransport { + /// HTTP transport + Http(ReqwestTransport), + /// `WebSocket` transport + Ws(PubSubFrontend), + /// IPC transport + Ipc(PubSubFrontend), +} + +impl InnerTransport { + /// Connects to a transport based on the given URL and JWT. + /// Returns an [`InnerTransport`] and the [`Claims`] generated from the jwt. + async fn connect(url: Url, jwt: JwtSecret) -> Result<(Self, Claims), AuthTransportError> { + match url.scheme() { + "http" | "https" => Self::connect_http(url, jwt), + "ws" | "wss" => Self::connect_ws(url, jwt).await, + "file" => Ok((Self::connect_ipc(url).await?, Claims::default())), + _ => Err(AuthTransportError::BadScheme(url.scheme().to_string())), + } + } + + /// Connects to an HTTP transport. + /// Returns an [`InnerTransport`] and the [Claims] generated from the jwt. + fn connect_http(url: Url, jwt: JwtSecret) -> Result<(Self, Claims), AuthTransportError> { + let mut client_builder = Client::builder().tls_built_in_root_certs(url.scheme() == "https"); + + // Add the JWT it to the headers if we can decode it. + let (auth, claims) = build_auth(jwt).map_err(AuthTransportError::InvalidJwt)?; + + let mut auth_value = HeaderValue::from_str(&auth.to_string()).expect("invalid string"); + auth_value.set_sensitive(true); + + let mut headers = header::HeaderMap::new(); + headers.insert(header::AUTHORIZATION, auth_value); + client_builder = client_builder.default_headers(headers); + + let client = client_builder.build().map_err(AuthTransportError::HttpConstructionError)?; + let inner = Self::Http(Http::with_client(client, url)); + + Ok((inner, claims)) + } + + /// Connects to a `WebSocket` transport. + /// Returns an [`InnerTransport`] and the [`Claims`] generated from the jwt. + async fn connect_ws(url: Url, jwt: JwtSecret) -> Result<(Self, Claims), AuthTransportError> { + // Add the JWT to the headers if we can decode it. + let (auth, claims) = build_auth(jwt).map_err(AuthTransportError::InvalidJwt)?; + + let ws = WsConnect { url: url.to_string(), auth: Some(auth) }; + + match ws.into_service().await { + Ok(ws) => Ok((Self::Ws(ws), claims)), + Err(e) => Err(AuthTransportError::TransportError(e, url.to_string())), + } + } + + /// Connects to an IPC transport. Returns an [`InnerTransport`]. + /// Does not return any [`Claims`] because IPC does not require them. + async fn connect_ipc(url: Url) -> Result { + // IPC, even for engine, typically does not require auth because it's local + let ipc = IpcConnect::new(url.to_string()); + + match ipc.into_service().await { + Ok(ipc) => Ok(Self::Ipc(ipc)), + Err(e) => Err(AuthTransportError::TransportError(e, url.to_string())), + } + } +} + +/// An error that can occur when creating an authenticated transport. +#[derive(Debug, thiserror::Error)] +pub enum AuthTransportError { + /// The JWT is invalid. + #[error("The JWT is invalid: {0}")] + InvalidJwt(eyre::Error), + /// The transport failed to connect. + #[error("The transport failed to connect to {1}, transport error: {0}")] + TransportError(TransportError, String), + /// The http client could not be built. + #[error("The http client could not be built")] + HttpConstructionError(reqwest::Error), + /// The scheme is invalid. + #[error("The URL scheme is invalid: {0}")] + BadScheme(String), +} + +/// An authenticated transport that can be used to send requests that contain a jwt bearer token. +#[derive(Debug, Clone)] +pub struct AuthenticatedTransport { + /// The inner actual transport used. + /// + /// Also contains the current claims being used. This is used to determine whether or not we + /// should create another client. + inner_and_claims: Arc>, + /// The current jwt being used. This is so we can recreate claims. + jwt: JwtSecret, + /// The current URL being used. This is so we can recreate the client if needed. + url: Url, +} + +impl AuthenticatedTransport { + /// Create a new builder with the given URL and JWT secret. + pub async fn connect(url: Url, jwt: JwtSecret) -> Result { + let (inner, claims) = InnerTransport::connect(url.clone(), jwt).await?; + Ok(Self { inner_and_claims: Arc::new(RwLock::new((inner, claims))), jwt, url }) + } + + /// Sends a request using the underlying transport. + /// + /// For sending the actual request, this action is delegated down to the underlying transport + /// through Tower's [`tower::Service::call`]. + /// + /// See tower's [`tower::Service`] trait for more information. + fn request(&self, req: RequestPacket) -> TransportFut<'static> { + let this = self.clone(); + + Box::pin(async move { + let mut inner_and_claims = this.inner_and_claims.write().await; + + // shift the iat forward by one second so there is some buffer time + let mut shifted_claims = inner_and_claims.1; + shifted_claims.iat -= 1; + + // if the claims are out of date, reset the inner transport + if !shifted_claims.is_within_time_window() { + match InnerTransport::connect(this.url, this.jwt).await { + Ok((new_inner, new_claims)) => *inner_and_claims = (new_inner, new_claims), + Err(e) => return Err(TransportErrorKind::custom(Box::new(e))), + } + } + + match inner_and_claims.0 { + InnerTransport::Http(ref mut http) => http.call(req), + InnerTransport::Ws(ref mut ws) => ws.call(req), + InnerTransport::Ipc(ref mut ipc) => ipc.call(req), + } + .await + }) + } +} + +/// Generate claims (iat with current timestamp). +/// This happens by default using the Default trait for Claims. +fn build_auth(secret: JwtSecret) -> eyre::Result<(Authorization, Claims)> { + let claims = Claims::default(); + let token = secret.encode(&claims)?; + let auth = Authorization::Bearer(token); + + Ok((auth, claims)) +} + +/// This specifies how to connect to an authenticated transport. +#[derive(Clone, Debug)] +pub struct AuthenticatedTransportConnect { + /// The URL to connect to. + url: Url, + /// The JWT secret used to authenticate the transport. + jwt: JwtSecret, +} + +impl AuthenticatedTransportConnect { + /// Create a new builder with the given URL. + pub const fn new(url: Url, jwt: JwtSecret) -> Self { + Self { url, jwt } + } +} + +impl TransportConnect for AuthenticatedTransportConnect { + type Transport = AuthenticatedTransport; + + fn is_local(&self) -> bool { + guess_local_url(&self.url) + } + + fn get_transport<'a: 'b, 'b>(&'a self) -> Pbf<'b, Self::Transport, TransportError> { + AuthenticatedTransport::connect(self.url.clone(), self.jwt) + .map(|res| match res { + Ok(transport) => Ok(transport), + Err(err) => { + Err(TransportError::Transport(TransportErrorKind::Custom(Box::new(err)))) + } + }) + .boxed() + } +} + +impl tower::Service for AuthenticatedTransport { + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + self.request(req) + } +} + +impl tower::Service for &AuthenticatedTransport { + type Response = ResponsePacket; + type Error = TransportError; + type Future = TransportFut<'static>; + + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: RequestPacket) -> Self::Future { + self.request(req) + } +} diff --git a/crates/rollup/src/lib.rs b/crates/rollup/src/lib.rs index ed753fc..b9ab6c5 100644 --- a/crates/rollup/src/lib.rs +++ b/crates/rollup/src/lib.rs @@ -10,6 +10,9 @@ pub use driver::Driver; mod cli; pub use cli::HeraArgsExt; +mod engine; +pub use engine::EngineApiClient; + mod validator; pub use validator::AttributesValidator; diff --git a/crates/rollup/src/validator.rs b/crates/rollup/src/validator.rs index 69b656a..d29e55b 100644 --- a/crates/rollup/src/validator.rs +++ b/crates/rollup/src/validator.rs @@ -11,17 +11,16 @@ use alloy::{ use async_trait::async_trait; use eyre::{bail, eyre, Result}; use op_alloy_rpc_types_engine::{OptimismAttributesWithParent, OptimismPayloadAttributes}; -use reqwest::{ - header::{AUTHORIZATION, CONTENT_TYPE}, - Client, StatusCode, -}; use reth::rpc::types::{ - engine::{Claims, JwtSecret}, + engine::{ForkchoiceState, JwtSecret}, Header, }; -use tracing::error; +use reth_node_api::EngineApiMessageVersion; +use tracing::{error, warn}; use url::Url; +use crate::EngineApiClient; + /// AttributesValidator /// /// A trait that defines the interface for validating newly derived L2 attributes. @@ -132,55 +131,39 @@ impl AttributesValidator for TrustedValidator { /// The engine API will return a `VALID` or `INVALID` response. #[derive(Debug, Clone)] pub struct EngineApiValidator { - /// The engine API URL. - url: Url, - /// The reqwest client. - client: Client, - /// The JWT secret token for the engine API. - jwt_secret: JwtSecret, + client: EngineApiClient, } impl EngineApiValidator { /// Creates a new [`EngineApiValidator`] from the provided [Url] and [JwtSecret]. + /// + /// The inner client will work with either HTTP, Websocket, or IPC transport based + /// on the provided URL. #[allow(unused)] - pub fn new_http(url: Url, jwt: JwtSecret) -> Self { - Self { url, client: Client::new(), jwt_secret: jwt } + pub async fn new(url: Url, jwt: JwtSecret) -> eyre::Result { + Ok(Self { client: EngineApiClient::new(url, jwt).await? }) } } #[async_trait] impl AttributesValidator for EngineApiValidator { async fn validate(&self, attributes: &OptimismAttributesWithParent) -> Result { - let request_body = serde_json::json!({ - "id": 1, - "jsonrpc": "2.0", - "method": "engine_newPayloadV2", - "params": [attributes.attributes] - }); - - let claims = Claims::default(); - let jwt = self.jwt_secret.encode(&claims)?; - - let response = self - .client - .post(self.url.clone()) - .header(CONTENT_TYPE, "application/json") - .header(AUTHORIZATION, format!("Bearer {}", jwt)) - .json(&request_body) - .send() - .await?; - - let status = response.status(); - let body = response.json::().await?; - match status { - StatusCode::OK => Ok(body - .pointer("/result/status") - .and_then(|status| status.as_str()) - .map_or(false, |status| status == "VALID")), - _ => { - error!(?body, "Engine API returned status: {}", status); - bail!("Engine API returned status: {} and body: {:#?}", status, body); - } + // TODO: use the correct values + let fork_choice_state = ForkchoiceState { + head_block_hash: attributes.parent.block_info.hash, + finalized_block_hash: attributes.parent.block_info.hash, + safe_block_hash: attributes.parent.block_info.hash, + }; + + let version = EngineApiMessageVersion::V2; // TODO: Determine the correct version + let attributes = Some(attributes.attributes.clone()); + let fcu = self.client.fork_choice_updated(version, fork_choice_state, attributes).await?; + + if fcu.is_valid() { + Ok(true) + } else { + warn!(status = %fcu.payload_status, "Engine API returned invalid fork choice update"); + Ok(false) } } }