Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: install payload delay #134

Merged
merged 7 commits into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,7 @@ serde_json = "1"
thiserror = "2"
futures = "0.3"
url = "2.5"
parking_lot = "0.12"

# misc-testing
rstest = "0.18.2"
14 changes: 14 additions & 0 deletions bin/odyssey/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use eyre::Context;
use odyssey_node::{
broadcaster::periodic_broadcaster,
chainspec::OdysseyChainSpecParser,
delayed_resolve::{DelayedResolver, MAX_DELAY_INTO_SLOT},
forwarder::forward_raw_transactions,
node::OdysseyNode,
rpc::{EthApiExt, EthApiOverrideServer},
Expand All @@ -40,6 +41,7 @@ use reth_node_builder::{engine_tree_config::TreeConfig, EngineNodeLauncher, Node
use reth_optimism_cli::Cli;
use reth_optimism_node::{args::RollupArgs, node::OpAddOnsBuilder};
use reth_provider::{providers::BlockchainProvider2, CanonStateSubscriptions};
use std::time::Duration;
use tracing::{info, warn};

#[global_allocator]
Expand Down Expand Up @@ -110,6 +112,18 @@ fn main() {
ctx.modules.merge_configured(walltime.into_rpc())?;
info!(target: "reth::cli", "Walltime configured");

// wrap the getPayloadV3 method in a delay
let engine_module = ctx.auth_module.module_mut().clone();
let delay_into_slot = std::env::var("MAX_PAYLOAD_DELAY")
.ok()
.and_then(|val| val.parse::<u64>().map(Duration::from_millis).ok())
.unwrap_or(MAX_DELAY_INTO_SLOT);

let delayed_payload = DelayedResolver::new(engine_module, delay_into_slot);
delayed_payload.clone().spawn(ctx.provider().canonical_state_stream());
ctx.auth_module.replace_auth_methods(delayed_payload.into_rpc_module())?;
info!(target: "reth::cli", "Configured payload delay");

Ok(())
})
.launch_with_fn(|builder| {
Expand Down
4 changes: 4 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ reth-trie-common.workspace = true
reth-trie-db.workspace = true
reth-network.workspace = true
reth-network-types.workspace = true
reth-chain-state.workspace = true

alloy-consensus.workspace = true
alloy-eips.workspace = true
Expand All @@ -50,6 +51,9 @@ tokio.workspace = true
tracing.workspace = true
eyre.workspace = true
jsonrpsee.workspace = true
futures.workspace = true
parking_lot.workspace = true
serde.workspace = true

[lints]
workspace = true
140 changes: 140 additions & 0 deletions crates/node/src/delayed_resolve.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
//! Helper that delays resolving the payload

use futures::{Stream, StreamExt};
use jsonrpsee::{
core::traits::ToRpcParams,
types::{error::INVALID_PARAMS_CODE, ErrorObject, Params},
MethodsError, RpcModule,
};
use parking_lot::Mutex;
use reth_chain_state::CanonStateNotification;
use serde::de::Error;
use serde_json::value::RawValue;
use std::{
sync::Arc,
time::{Duration, Instant},
};

/// Delay into the slot
pub const MAX_DELAY_INTO_SLOT: Duration = Duration::from_millis(500);

/// The getpayload fn we want to delay
pub const GET_PAYLOAD_V3: &str = "engine_getPayloadV3";

/// A helper that tracks the block clock timestamp and can delay resolving the payload to give the
/// payload builder more time to build a block.
#[derive(Debug, Clone)]
pub struct DelayedResolver {
inner: Arc<DelayedResolverInner>,
}

impl DelayedResolver {
/// Creates a new instance with the engine module and the duration we should target
pub fn new(engine_module: RpcModule<()>, max_delay_into_slot: Duration) -> Self {
Self {
inner: Arc::new(DelayedResolverInner {
last_block_time: Mutex::new(Instant::now()),
engine_module,
max_delay_into_slot,
}),
}
}

/// Listen for new blocks and track the local timestamp.
pub fn spawn<St>(self, mut st: St)
where
St: Stream<Item = CanonStateNotification> + Send + Unpin + 'static,
{
tokio::task::spawn(async move {
while st.next().await.is_some() {
*self.inner.last_block_time.lock() = Instant::now();
}
});
}

async fn call(&self, params: Params<'static>) -> Result<String, MethodsError> {
let last = *self.inner.last_block_time.lock();
let now = Instant::now();
// how far we're into the slot
let offset = now.duration_since(last);

if offset < self.inner.max_delay_into_slot {
// if we received the request before the max delay exceeded we can delay the request to
// give the payload builder more time to build the payload.
let delay = self.inner.max_delay_into_slot - offset;
tokio::time::sleep(delay).await;
}

let params = params
.as_str()
.ok_or_else(|| MethodsError::Parse(serde_json::Error::missing_field("payload id")))?;

self.inner.engine_module.call(GET_PAYLOAD_V3, PayloadParam(params.to_string())).await
}

/// Converts this type into a new [`RpcModule`] that delegates the get payload call.
pub fn into_rpc_module(self) -> RpcModule<()> {
let mut module = RpcModule::new(());
module
.register_async_method(GET_PAYLOAD_V3, move |params, _ctx, _| {
let value = self.clone();
async move {
value.call(params).await.map_err(|err| match err {
MethodsError::JsonRpc(err) => err,
err => ErrorObject::owned(
INVALID_PARAMS_CODE,
format!("invalid payload call: {:?}", err),
None::<()>,
),
})
}
})
.unwrap();

module
}
}

#[derive(Debug)]
struct DelayedResolverInner {
/// Tracks the time when the last block was emitted
last_block_time: Mutex<Instant>,
engine_module: RpcModule<()>,
/// By how much we want to delay getPayload into the slot
max_delay_into_slot: Duration,
}

struct PayloadParam(String);

impl ToRpcParams for PayloadParam {
fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
RawValue::from_string(self.0).map(Some)
}
}

#[cfg(test)]
mod tests {
use super::*;
use alloy_rpc_types::engine::PayloadId;

#[tokio::test]
async fn test_delayed_forward() {
use jsonrpsee::{core::RpcResult, RpcModule};

let mut module = RpcModule::new(());
module
.register_method::<RpcResult<PayloadId>, _>(GET_PAYLOAD_V3, |params, _, _| {
params.one::<PayloadId>()
})
.unwrap();

let id = PayloadId::default();

let echo: PayloadId = module.call(GET_PAYLOAD_V3, [id]).await.unwrap();
assert_eq!(echo, id);

let delayer = DelayedResolver::new(module, MAX_DELAY_INTO_SLOT).into_rpc_module();
let echo: PayloadId = delayer.call(GET_PAYLOAD_V3, [id]).await.unwrap();
assert_eq!(echo, id);
}
}
1 change: 1 addition & 0 deletions crates/node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

pub mod broadcaster;
pub mod chainspec;
pub mod delayed_resolve;
pub mod evm;
pub mod forwarder;
pub mod node;
Expand Down
1 change: 1 addition & 0 deletions crates/wallet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,7 @@ impl<T> OdysseyWallet<T> {
Self { inner: Arc::new(inner) }
}

#[allow(clippy::missing_const_for_fn)]
fn chain_id(&self) -> ChainId {
self.inner.chain_id
}
Expand Down
Loading